Author: [log in to unmask] Date: Wed Apr 29 15:05:20 2015 New Revision: 2860 Log: Overhaul the EvioFileProducer class. Command line parsing is done with Apache CLI now. Default options are supplied so that only the EVIO files are required. Modified: java/trunk/record-util/src/main/java/org/hps/record/evio/EvioFileProducer.java Modified: java/trunk/record-util/src/main/java/org/hps/record/evio/EvioFileProducer.java ============================================================================= --- java/trunk/record-util/src/main/java/org/hps/record/evio/EvioFileProducer.java (original) +++ java/trunk/record-util/src/main/java/org/hps/record/evio/EvioFileProducer.java Wed Apr 29 15:05:20 2015 @@ -1,13 +1,22 @@ package org.hps.record.evio; +import java.io.BufferedReader; import java.io.File; -import java.net.InetAddress; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStreamReader; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.util.ArrayList; import java.util.Arrays; import java.util.List; - +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; import org.jlab.coda.et.EtAttachment; import org.jlab.coda.et.EtConstants; import org.jlab.coda.et.EtEvent; @@ -18,91 +27,143 @@ import org.jlab.coda.jevio.EventWriter; import org.jlab.coda.jevio.EvioEvent; import org.jlab.coda.jevio.EvioReader; +import org.lcsim.util.log.DefaultLogFormatter; +import org.lcsim.util.log.LogUtil; /** - * A utility class for streaming an EVIO file to an ET server. + * A command line utility for streaming EVIO files to an ET server. * <p> - * Original version was copied from the CODA group's ET java module. + * The original version was copied from the CODA ET module and modified. * * @author <a href="mailto:[log in to unmask]">Jeremy McCormick</a> */ -// TODO: Add option to set number of events in the put array. public final class EvioFileProducer { /** - * Flag to turn on/off debug print. - */ - private static final boolean debug = false; + * Default event count for printing event sequence number. + */ + private static final int DEFAULT_COUNT = 1000; + + /** + * Default delay time in milliseconds between events. + */ + private static final int DEFAULT_DELAY = 0; + + /** + * The default ET group. + */ + private static final int DEFAULT_ET_GROUP = 1; + + /** + * Default event buffer size (200 KB). + */ + private static final int DEFAULT_EVENT_SIZE = 200000; + + /** + * Default host name. + */ + private static final String DEFAULT_HOST = "localhost"; + + /** + * Default ET system name. + */ + private static final String DEFAULT_NAME = "ETBuffer"; + + /** + * Maximum port number of ET server (maximum value of a TCP/IP port). + */ + private static final int ET_PORT_MAX = 65535; + + /** + * Minimum port number of ET server (lower port numbers not allowed). + */ + private static final int ET_PORT_MIN = 1024; + + /** + * Setup the logger. + */ + private static final Logger LOGGER = LogUtil + .create(EvioFileProducer.class, new DefaultLogFormatter(), Level.CONFIG); + + /** + * The command line options. + */ + private static final Options OPTIONS = new Options(); + + /** + * Define the command line options. + */ + static { + OPTIONS.addOption("h", "help", false, "print help"); + OPTIONS.addOption("c", "count", true, "interval for printing event numbers"); + OPTIONS.addOption("d", "delay", true, "delay in milliseconds between events"); + OPTIONS.addOption("e", "file", true, "add an EVIO file"); + OPTIONS.addOption("f", "name", true, "ET system name which should be a buffer file"); + OPTIONS.addOption("g", "group", true, "group number of the events"); + OPTIONS.addOption("H", "host", true, "server host name"); + OPTIONS.addOption("l", "list", true, "text file with list of EVIO files"); + OPTIONS.addOption("L", "log", true, "log level (INFO, FINE, etc.)"); + OPTIONS.addOption("p", "port", true, "server port"); + OPTIONS.addOption("s", "size", true, "event buffer size in bytes"); + } /** * The externally accessible main method. * - * @param args The command line arguments. + * @param args the command line arguments */ public static void main(final String[] args) { - new EvioFileProducer().doMain(args); // call wrapper method - } - - /** - * Print usage statement. + new EvioFileProducer().run(args); + } + + /** + * Print usage statement and exit. */ private static void usage() { - System.out.println("\nUsage: java Producer -f <et name> -e <evio file> [-p <server port>] [-host <host>]" - + " [-d <delay in millisec>] [-g <group #>]\n\n" + " -f ET system's name\n" - + " -s size in bytes for requested events\n" - + " -p port number for a udp broadcast\n" - + " -g group number of new events to get\n" - + " -host host the ET system resides on (defaults to anywhere)\n\n" - + " This consumer works by making a connection to the\n" - + " ET system's tcp server port.\n"); + final HelpFormatter help = new HelpFormatter(); + help.printHelp("EvioFileProducer", OPTIONS); System.exit(1); } /** - * The byte buffer used to transfer data from EVIO to ET. - */ - private ByteBuffer byteBuffer; + * Event count for printing message. + */ + private int count = DEFAULT_COUNT; /** * A delay in milliseconds between put operations. */ - private int delay = 0; - - /** - * The ET system name which generally maps to a buffer file. - */ - private String etName; - - /** - * The list of input EVIO files. + private int delay = DEFAULT_DELAY; + + /** + * The ET system name which maps to a buffer file. + */ + private String etName = DEFAULT_NAME; + + /** + * The master list of input EVIO files to stream. */ private final List<File> evioFiles = new ArrayList<File>(); /** - * This is used for a "group" value when doing put but not sure what it actually does. - */ - private int group = 1; - - /** - * The server host name. - */ - private String host; - - /** - * The server's network port. + * The ET group (default is 1). + */ + private int group = DEFAULT_ET_GROUP; + + /** + * The server host name (default is localhost). + */ + private String host = DEFAULT_HOST; + + /** + * The server's network port (default is standard ET server port). */ private int port = EtConstants.serverPort; /** - * The EVIO reader used to read the input EVIO events. - */ - private EvioReader reader; - - /** - * The default ET event size. - */ - // FIXME: Should be a lot bigger? - private int size = 10000; // Default event size. + * The default ET event size (default is 200 KB). + */ + private int size = DEFAULT_EVENT_SIZE; /** * Class constructor. @@ -111,174 +172,211 @@ } /** - * Copy byte buffer to an <code>EtEvent</code>. + * Print the command line job configuration to the log. + */ + private void logConfig() { + final StringBuffer sb = new StringBuffer(); + sb.append("count = " + this.count + '\n'); + sb.append("delay = " + this.delay + '\n'); + sb.append("etName = " + this.etName + '\n'); + sb.append("group = " + this.group + '\n'); + sb.append("host = " + this.host + '\n'); + sb.append("port = " + this.port + '\n'); + sb.append("size = " + this.size + '\n'); + sb.append("EVIO files ..." + '\n'); + for (final File evioFile : this.evioFiles) { + sb.append(evioFile.getPath() + '\n'); + } + LOGGER.config(sb.toString()); + } + + /** + * Run the job by streaming all the EVIO files to the ET server using the command line arguments. * - * @param event The target EtEvent. - */ - public void copyToEtEvent(final EtEvent event) { - event.getDataBuffer().put(this.byteBuffer); - } - - /** - * Wrapper method called in main. - * - * @param args The command line arguments. - */ - public void doMain(final String[] args) { + * @param args the command line arguments + */ + public void run(final String[] args) { + + // Command line parser. + final PosixParser parser = new PosixParser(); + try { - for (int i = 0; i < args.length; i++) { - if (args[i].equalsIgnoreCase("-e")) { - // evioFileName = new String(args[++i]); - this.evioFiles.add(new File(args[++i])); - } else if (args[i].equalsIgnoreCase("-f")) { - this.etName = args[++i]; - } else if (args[i].equalsIgnoreCase("-host")) { - this.host = args[++i]; - } else if (args[i].equalsIgnoreCase("-p")) { - try { - this.port = Integer.parseInt(args[++i]); - if (this.port < 1024 || this.port > 65535) { - System.out.println("Port number must be between 1024 and 65535."); - usage(); - return; - } - } catch (final NumberFormatException ex) { - System.out.println("Did not specify a proper port number."); - usage(); - return; - } - } else if (args[i].equalsIgnoreCase("-s")) { - try { - this.size = Integer.parseInt(args[++i]); - if (this.size < 1) { - System.out.println("Size needs to be positive int."); - usage(); - return; - } - } catch (final NumberFormatException ex) { - System.out.println("Did not specify a proper size."); - usage(); - return; - } - } else if (args[i].equalsIgnoreCase("-g")) { - try { - this.group = Integer.parseInt(args[++i]); - if (this.group < 1 || this.group > 10) { - System.out.println("Group number must be between 0 and 10."); - usage(); - return; - } - } catch (final NumberFormatException ex) { - System.out.println("Did not specify a proper group number."); - usage(); - return; - } - } else if (args[i].equalsIgnoreCase("-d")) { - try { - this.delay = Integer.parseInt(args[++i]); - if (this.delay < 1) { - System.out.println("delay must be > 0."); - usage(); - return; - } - } catch (final NumberFormatException ex) { - System.out.println("Did not specify a proper delay."); - usage(); - return; - } - } else { - usage(); - return; - } - } - - if (this.host == null) { - // host = EtConstants.hostAnywhere; - this.host = InetAddress.getLocalHost().getHostName(); - } - - // ET name is required. - if (this.etName == null) { - System.out.println("EVIO file name argument is required"); - usage(); - return; - } - + + // Parse the command line arguments. + final CommandLine cl = parser.parse(OPTIONS, args); + + // Set the log level of this class before doing anything else. + if (cl.hasOption("L")) { + final Level level = Level.parse(cl.getOptionValue("L")); + // Default level is CONFIG so this message will always show. + LOGGER.config("Log level will be set to " + level + "."); + + // Set the new log level. This may suppress subsequent configuration print outs! + LOGGER.setLevel(level); + } + + // Add EVIO files to the job. + if (cl.hasOption("e")) { + for (final String fileName : cl.getOptionValues("e")) { + final File evioFile = new File(fileName); + LOGGER.config("adding EVIO file " + evioFile.getPath()); + this.evioFiles.add(evioFile); + } + } + + // Set ET name which is the buffer file. + if (cl.hasOption("f")) { + this.etName = cl.getOptionValue("f"); + } + + // Add EVIO files from a text file list, assuming one file path per line. + if (cl.hasOption("l")) { + final String filePath = cl.getOptionValue("l"); + final File listFile = new File(filePath); + if (!listFile.exists()) { + throw new IllegalArgumentException("The file " + listFile.getPath() + " does not exist."); + } + BufferedReader br = null; + try { + br = new BufferedReader(new InputStreamReader(new FileInputStream(listFile))); + String line; + while ((line = br.readLine()) != null) { + this.evioFiles.add(new File(line.trim())); + } + } finally { + if (br != null) { + br.close(); + } + } + } + + // Set host name. + if (cl.hasOption("H")) { + this.host = cl.getOptionValue("H"); + } + // if (this.host == null) { + // host = EtConstants.hostAnywhere; + // this.host = InetAddress.getLocalHost().getHostName(); + // } + + // Set the port number. + if (cl.hasOption("p")) { + this.port = Integer.parseInt(cl.getOptionValue("p")); + if (this.port < ET_PORT_MIN || this.port > ET_PORT_MAX) { + throw new IllegalArgumentException("Port number must be between 1024 and 65535."); + } + } + // Set the size of the event buffer in bytes. + if (cl.hasOption("s")) { + this.size = Integer.parseInt(cl.getOptionValue("s")); + if (this.size < 1) { + throw new IllegalArgumentException("Size needs to be positive int."); + } + } + + // Set the group number. + if (cl.hasOption("g")) { + this.group = Integer.parseInt(cl.getOptionValue("g")); + if (this.group < 1 || this.group > 10) { + throw new IllegalArgumentException("Group number must be between 0 and 10."); + } + } + + // Set the delay in milliseconds between putting events. + if (cl.hasOption("d")) { + this.delay = Integer.parseInt(cl.getOptionValue("d")); + if (this.delay < 1) { + throw new IllegalArgumentException("The delay must be > 0."); + } + } + + if (cl.hasOption("c")) { + this.count = Integer.parseInt(cl.getOptionValue("c")); + if (this.count < 1) { + throw new IllegalArgumentException("The count must be > 0."); + } + } + + // At least one EVIO file must be present. if (this.evioFiles.size() == 0) { - System.out.println("At least one input EVIO file is required."); - usage(); - return; + throw new IllegalArgumentException("At least one input EVIO file is required."); } // Check existence of EVIO files. - System.out.println("EVIO input files ..."); + LOGGER.info("Checking EVIO file list ... "); for (final File evioFile : this.evioFiles) { - System.out.println(evioFile.getPath()); if (!evioFile.exists()) { - System.err.println("EVIO file does not exist: " + evioFile.getPath()); - throw new RuntimeException("EVIO input file does not exist."); - } - } - - // Setup ET system with the command line config. + throw new IllegalArgumentException("EVIO input file does not exist: " + evioFile.getPath()); + } + } + LOGGER.info("EVIO file list was checked!"); + + // Print out the configuration for the job to the log. + logConfig(); + + } catch (final Exception e) { /* Catches errors in command line arguments. */ + // If there are errors parsing or validating the command line arguments then print usage and exit. + LOGGER.log(Level.SEVERE, "Error while processing command line options.", e); + usage(); + } + + EtSystem sys = null; + EvioReader reader = null; + + try { + + // Setup ET system from the command line options. final EtSystemOpenConfig config = new EtSystemOpenConfig(this.etName, this.host, this.port); - final EtSystem sys = new EtSystem(config, EtConstants.debugInfo); + sys = new EtSystem(config, EtConstants.debugInfo); sys.open(); final EtStation gc = sys.stationNameToObject("GRAND_CENTRAL"); final EtAttachment att = sys.attach(gc); - // array of events + // Array of ET events. EtEvent[] mevs; // Loop over input EVIO file list. for (final File evioFile : this.evioFiles) { - // Open EVIO reader. - System.out.println("Opening next EVIO file: " + evioFile.getPath()); - this.reader = new EvioReader(evioFile.getPath(), false); - - // Print number of events. - if (debug) { - System.out.println("EVIO file opened with " + this.reader.getEventCount() + " events."); - } - - // Ref to current EVIO event. + // Open a new EVIO reader. + LOGGER.info("Opening next EVIO file " + evioFile.getPath() + " ..."); + reader = new EvioReader(evioFile.getPath(), false); + LOGGER.info("Done opening file!"); + + // Print the number of events. + LOGGER.info("EVIO file opened with " + reader.getEventCount() + " events."); + + // Reference to the current EVIO event. EvioEvent event; - // Event sequence number; starts with 1. + // Event sequence number. int eventCount = 0; // Loop until event source is exhausted. - while (true) { - - // Get next event. - event = this.reader.nextEvent(); + while ((event = reader.nextEvent()) != null) { + + // Increment event count. ++eventCount; - if (eventCount % 1000 == 0) { - System.out.println("EvioFileProducer - event <" + eventCount + ">"); - } - if (event == null) { - break; - } - - // Try to parse the next event. + + // Print event sequence. + if (eventCount % this.count == 0) { + LOGGER.info("EVIO event " + eventCount); + } + try { - this.reader.parseEvent(event); - if (debug) { - System.out.println("event #" + event.getEventNumber() + " is " + event.getTotalBytes() - + " bytes"); - } - } catch (final Exception e) { + // Parse the next EVIO event. + reader.parseEvent(event); + LOGGER.finest("EVIO event " + event.getEventNumber() + " is " + event.getTotalBytes() + + " bytes."); + } catch (final Exception e) { /* Catches parse errors reading the EVIO events. */ e.printStackTrace(); - System.out.println("Error making EVIO event with sequence number <" + eventCount - + "> in file <" + evioFile.getPath() + ">."); + LOGGER.warning("Error making EVIO event with seq number " + eventCount + " in file " + + evioFile.getPath()); // Attempt to recover from errors by skipping to next event if there are exceptions. continue; } - if (debug) { - System.out.println("new events - size=" + this.size + "; group=" + this.group); - } + LOGGER.finest("new events - size=" + this.size + "; group=" + this.group); final int eventTag = EvioEventUtilities.getEventTag(event); @@ -296,7 +394,7 @@ Arrays.fill(control, eventTag); mevs[0].setControl(control); - // Delay for X millis if applicable. + // Apply delay in milliseconds. if (this.delay > 0) { Thread.sleep(this.delay); } @@ -309,36 +407,43 @@ try { writer.close(); } catch (final Exception e) { - System.out.println("Caught exception while closing writer."); - e.printStackTrace(); + LOGGER.log(Level.WARNING, "Error while closing writer.", e); } mevs[0].setLength(buf.position()); mevs[0].setByteOrder(ByteOrder.nativeOrder()); - if (debug) { - for (final EtEvent mev : mevs) { - System.out.println("event length = " + mev.getLength() + ", remaining bytes: " - + mev.getDataBuffer().remaining()); - } + + for (final EtEvent mev : mevs) { + LOGGER.finest("event length = " + mev.getLength() + ", remaining bytes: " + + mev.getDataBuffer().remaining()); } // Put events onto the ET ring. sys.putEvents(att, mevs); - if (debug) { - System.out.println("Wrote event #" + eventCount + " to ET"); - System.out.println("-------------------------------"); - ++eventCount; - } - } - - this.reader.close(); - } - - // Cleanup. - sys.close(); - - } catch (final Exception e) { - throw new RuntimeException(e); + LOGGER.finest("Sucessfully wrote " + eventCount + " event to ET which was EVIO event number " + + event.getEventNumber() + " from file " + evioFile.getPath() + "."); + } + reader.close(); + } + + } catch (final Exception e) { /* Catches all event processing errors. */ + // This catches and re-throws all errors from processing the EVIO events and configuring the ET system. + throw new RuntimeException("Error streaming EVIO events to ET system.", e); + } finally { + // Cleanup the EVIO reader if needed. + if (reader != null && !reader.isClosed()) { + try { + reader.close(); + } catch (final IOException e) { + LOGGER.log(Level.WARNING, e.getMessage(), e); + } + } + // Cleanup the ET system. + if (sys != null && sys.alive()) { + sys.close(); + } } + + LOGGER.info("Done!"); } }