Author: [log in to unmask]
Date: Wed Apr 29 15:05:20 2015
New Revision: 2860
Log:
Overhaul the EvioFileProducer class. Command line parsing is done with Apache CLI now. Default options are supplied so that only the EVIO files are required.
Modified:
java/trunk/record-util/src/main/java/org/hps/record/evio/EvioFileProducer.java
Modified: java/trunk/record-util/src/main/java/org/hps/record/evio/EvioFileProducer.java
=============================================================================
--- java/trunk/record-util/src/main/java/org/hps/record/evio/EvioFileProducer.java (original)
+++ java/trunk/record-util/src/main/java/org/hps/record/evio/EvioFileProducer.java Wed Apr 29 15:05:20 2015
@@ -1,13 +1,22 @@
package org.hps.record.evio;
+import java.io.BufferedReader;
import java.io.File;
-import java.net.InetAddress;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
import org.jlab.coda.et.EtAttachment;
import org.jlab.coda.et.EtConstants;
import org.jlab.coda.et.EtEvent;
@@ -18,91 +27,143 @@
import org.jlab.coda.jevio.EventWriter;
import org.jlab.coda.jevio.EvioEvent;
import org.jlab.coda.jevio.EvioReader;
+import org.lcsim.util.log.DefaultLogFormatter;
+import org.lcsim.util.log.LogUtil;
/**
- * A utility class for streaming an EVIO file to an ET server.
+ * A command line utility for streaming EVIO files to an ET server.
* <p>
- * Original version was copied from the CODA group's ET java module.
+ * The original version was copied from the CODA ET module and modified.
*
* @author <a href="mailto:[log in to unmask]">Jeremy McCormick</a>
*/
-// TODO: Add option to set number of events in the put array.
public final class EvioFileProducer {
/**
- * Flag to turn on/off debug print.
- */
- private static final boolean debug = false;
+ * Default event count for printing event sequence number.
+ */
+ private static final int DEFAULT_COUNT = 1000;
+
+ /**
+ * Default delay time in milliseconds between events.
+ */
+ private static final int DEFAULT_DELAY = 0;
+
+ /**
+ * The default ET group.
+ */
+ private static final int DEFAULT_ET_GROUP = 1;
+
+ /**
+ * Default event buffer size (200 KB).
+ */
+ private static final int DEFAULT_EVENT_SIZE = 200000;
+
+ /**
+ * Default host name.
+ */
+ private static final String DEFAULT_HOST = "localhost";
+
+ /**
+ * Default ET system name.
+ */
+ private static final String DEFAULT_NAME = "ETBuffer";
+
+ /**
+ * Maximum port number of ET server (maximum value of a TCP/IP port).
+ */
+ private static final int ET_PORT_MAX = 65535;
+
+ /**
+ * Minimum port number of ET server (lower port numbers not allowed).
+ */
+ private static final int ET_PORT_MIN = 1024;
+
+ /**
+ * Setup the logger.
+ */
+ private static final Logger LOGGER = LogUtil
+ .create(EvioFileProducer.class, new DefaultLogFormatter(), Level.CONFIG);
+
+ /**
+ * The command line options.
+ */
+ private static final Options OPTIONS = new Options();
+
+ /**
+ * Define the command line options.
+ */
+ static {
+ OPTIONS.addOption("h", "help", false, "print help");
+ OPTIONS.addOption("c", "count", true, "interval for printing event numbers");
+ OPTIONS.addOption("d", "delay", true, "delay in milliseconds between events");
+ OPTIONS.addOption("e", "file", true, "add an EVIO file");
+ OPTIONS.addOption("f", "name", true, "ET system name which should be a buffer file");
+ OPTIONS.addOption("g", "group", true, "group number of the events");
+ OPTIONS.addOption("H", "host", true, "server host name");
+ OPTIONS.addOption("l", "list", true, "text file with list of EVIO files");
+ OPTIONS.addOption("L", "log", true, "log level (INFO, FINE, etc.)");
+ OPTIONS.addOption("p", "port", true, "server port");
+ OPTIONS.addOption("s", "size", true, "event buffer size in bytes");
+ }
/**
* The externally accessible main method.
*
- * @param args The command line arguments.
+ * @param args the command line arguments
*/
public static void main(final String[] args) {
- new EvioFileProducer().doMain(args); // call wrapper method
- }
-
- /**
- * Print usage statement.
+ new EvioFileProducer().run(args);
+ }
+
+ /**
+ * Print usage statement and exit.
*/
private static void usage() {
- System.out.println("\nUsage: java Producer -f <et name> -e <evio file> [-p <server port>] [-host <host>]"
- + " [-d <delay in millisec>] [-g <group #>]\n\n" + " -f ET system's name\n"
- + " -s size in bytes for requested events\n"
- + " -p port number for a udp broadcast\n"
- + " -g group number of new events to get\n"
- + " -host host the ET system resides on (defaults to anywhere)\n\n"
- + " This consumer works by making a connection to the\n"
- + " ET system's tcp server port.\n");
+ final HelpFormatter help = new HelpFormatter();
+ help.printHelp("EvioFileProducer", OPTIONS);
System.exit(1);
}
/**
- * The byte buffer used to transfer data from EVIO to ET.
- */
- private ByteBuffer byteBuffer;
+ * Event count for printing message.
+ */
+ private int count = DEFAULT_COUNT;
/**
* A delay in milliseconds between put operations.
*/
- private int delay = 0;
-
- /**
- * The ET system name which generally maps to a buffer file.
- */
- private String etName;
-
- /**
- * The list of input EVIO files.
+ private int delay = DEFAULT_DELAY;
+
+ /**
+ * The ET system name which maps to a buffer file.
+ */
+ private String etName = DEFAULT_NAME;
+
+ /**
+ * The master list of input EVIO files to stream.
*/
private final List<File> evioFiles = new ArrayList<File>();
/**
- * This is used for a "group" value when doing put but not sure what it actually does.
- */
- private int group = 1;
-
- /**
- * The server host name.
- */
- private String host;
-
- /**
- * The server's network port.
+ * The ET group (default is 1).
+ */
+ private int group = DEFAULT_ET_GROUP;
+
+ /**
+ * The server host name (default is localhost).
+ */
+ private String host = DEFAULT_HOST;
+
+ /**
+ * The server's network port (default is standard ET server port).
*/
private int port = EtConstants.serverPort;
/**
- * The EVIO reader used to read the input EVIO events.
- */
- private EvioReader reader;
-
- /**
- * The default ET event size.
- */
- // FIXME: Should be a lot bigger?
- private int size = 10000; // Default event size.
+ * The default ET event size (default is 200 KB).
+ */
+ private int size = DEFAULT_EVENT_SIZE;
/**
* Class constructor.
@@ -111,174 +172,211 @@
}
/**
- * Copy byte buffer to an <code>EtEvent</code>.
+ * Print the command line job configuration to the log.
+ */
+ private void logConfig() {
+ final StringBuffer sb = new StringBuffer();
+ sb.append("count = " + this.count + '\n');
+ sb.append("delay = " + this.delay + '\n');
+ sb.append("etName = " + this.etName + '\n');
+ sb.append("group = " + this.group + '\n');
+ sb.append("host = " + this.host + '\n');
+ sb.append("port = " + this.port + '\n');
+ sb.append("size = " + this.size + '\n');
+ sb.append("EVIO files ..." + '\n');
+ for (final File evioFile : this.evioFiles) {
+ sb.append(evioFile.getPath() + '\n');
+ }
+ LOGGER.config(sb.toString());
+ }
+
+ /**
+ * Run the job by streaming all the EVIO files to the ET server using the command line arguments.
*
- * @param event The target EtEvent.
- */
- public void copyToEtEvent(final EtEvent event) {
- event.getDataBuffer().put(this.byteBuffer);
- }
-
- /**
- * Wrapper method called in main.
- *
- * @param args The command line arguments.
- */
- public void doMain(final String[] args) {
+ * @param args the command line arguments
+ */
+ public void run(final String[] args) {
+
+ // Command line parser.
+ final PosixParser parser = new PosixParser();
+
try {
- for (int i = 0; i < args.length; i++) {
- if (args[i].equalsIgnoreCase("-e")) {
- // evioFileName = new String(args[++i]);
- this.evioFiles.add(new File(args[++i]));
- } else if (args[i].equalsIgnoreCase("-f")) {
- this.etName = args[++i];
- } else if (args[i].equalsIgnoreCase("-host")) {
- this.host = args[++i];
- } else if (args[i].equalsIgnoreCase("-p")) {
- try {
- this.port = Integer.parseInt(args[++i]);
- if (this.port < 1024 || this.port > 65535) {
- System.out.println("Port number must be between 1024 and 65535.");
- usage();
- return;
- }
- } catch (final NumberFormatException ex) {
- System.out.println("Did not specify a proper port number.");
- usage();
- return;
- }
- } else if (args[i].equalsIgnoreCase("-s")) {
- try {
- this.size = Integer.parseInt(args[++i]);
- if (this.size < 1) {
- System.out.println("Size needs to be positive int.");
- usage();
- return;
- }
- } catch (final NumberFormatException ex) {
- System.out.println("Did not specify a proper size.");
- usage();
- return;
- }
- } else if (args[i].equalsIgnoreCase("-g")) {
- try {
- this.group = Integer.parseInt(args[++i]);
- if (this.group < 1 || this.group > 10) {
- System.out.println("Group number must be between 0 and 10.");
- usage();
- return;
- }
- } catch (final NumberFormatException ex) {
- System.out.println("Did not specify a proper group number.");
- usage();
- return;
- }
- } else if (args[i].equalsIgnoreCase("-d")) {
- try {
- this.delay = Integer.parseInt(args[++i]);
- if (this.delay < 1) {
- System.out.println("delay must be > 0.");
- usage();
- return;
- }
- } catch (final NumberFormatException ex) {
- System.out.println("Did not specify a proper delay.");
- usage();
- return;
- }
- } else {
- usage();
- return;
- }
- }
-
- if (this.host == null) {
- // host = EtConstants.hostAnywhere;
- this.host = InetAddress.getLocalHost().getHostName();
- }
-
- // ET name is required.
- if (this.etName == null) {
- System.out.println("EVIO file name argument is required");
- usage();
- return;
- }
-
+
+ // Parse the command line arguments.
+ final CommandLine cl = parser.parse(OPTIONS, args);
+
+ // Set the log level of this class before doing anything else.
+ if (cl.hasOption("L")) {
+ final Level level = Level.parse(cl.getOptionValue("L"));
+ // Default level is CONFIG so this message will always show.
+ LOGGER.config("Log level will be set to " + level + ".");
+
+ // Set the new log level. This may suppress subsequent configuration print outs!
+ LOGGER.setLevel(level);
+ }
+
+ // Add EVIO files to the job.
+ if (cl.hasOption("e")) {
+ for (final String fileName : cl.getOptionValues("e")) {
+ final File evioFile = new File(fileName);
+ LOGGER.config("adding EVIO file " + evioFile.getPath());
+ this.evioFiles.add(evioFile);
+ }
+ }
+
+ // Set ET name which is the buffer file.
+ if (cl.hasOption("f")) {
+ this.etName = cl.getOptionValue("f");
+ }
+
+ // Add EVIO files from a text file list, assuming one file path per line.
+ if (cl.hasOption("l")) {
+ final String filePath = cl.getOptionValue("l");
+ final File listFile = new File(filePath);
+ if (!listFile.exists()) {
+ throw new IllegalArgumentException("The file " + listFile.getPath() + " does not exist.");
+ }
+ BufferedReader br = null;
+ try {
+ br = new BufferedReader(new InputStreamReader(new FileInputStream(listFile)));
+ String line;
+ while ((line = br.readLine()) != null) {
+ this.evioFiles.add(new File(line.trim()));
+ }
+ } finally {
+ if (br != null) {
+ br.close();
+ }
+ }
+ }
+
+ // Set host name.
+ if (cl.hasOption("H")) {
+ this.host = cl.getOptionValue("H");
+ }
+ // if (this.host == null) {
+ // host = EtConstants.hostAnywhere;
+ // this.host = InetAddress.getLocalHost().getHostName();
+ // }
+
+ // Set the port number.
+ if (cl.hasOption("p")) {
+ this.port = Integer.parseInt(cl.getOptionValue("p"));
+ if (this.port < ET_PORT_MIN || this.port > ET_PORT_MAX) {
+ throw new IllegalArgumentException("Port number must be between 1024 and 65535.");
+ }
+ }
+ // Set the size of the event buffer in bytes.
+ if (cl.hasOption("s")) {
+ this.size = Integer.parseInt(cl.getOptionValue("s"));
+ if (this.size < 1) {
+ throw new IllegalArgumentException("Size needs to be positive int.");
+ }
+ }
+
+ // Set the group number.
+ if (cl.hasOption("g")) {
+ this.group = Integer.parseInt(cl.getOptionValue("g"));
+ if (this.group < 1 || this.group > 10) {
+ throw new IllegalArgumentException("Group number must be between 0 and 10.");
+ }
+ }
+
+ // Set the delay in milliseconds between putting events.
+ if (cl.hasOption("d")) {
+ this.delay = Integer.parseInt(cl.getOptionValue("d"));
+ if (this.delay < 1) {
+ throw new IllegalArgumentException("The delay must be > 0.");
+ }
+ }
+
+ if (cl.hasOption("c")) {
+ this.count = Integer.parseInt(cl.getOptionValue("c"));
+ if (this.count < 1) {
+ throw new IllegalArgumentException("The count must be > 0.");
+ }
+ }
+
+ // At least one EVIO file must be present.
if (this.evioFiles.size() == 0) {
- System.out.println("At least one input EVIO file is required.");
- usage();
- return;
+ throw new IllegalArgumentException("At least one input EVIO file is required.");
}
// Check existence of EVIO files.
- System.out.println("EVIO input files ...");
+ LOGGER.info("Checking EVIO file list ... ");
for (final File evioFile : this.evioFiles) {
- System.out.println(evioFile.getPath());
if (!evioFile.exists()) {
- System.err.println("EVIO file does not exist: " + evioFile.getPath());
- throw new RuntimeException("EVIO input file does not exist.");
- }
- }
-
- // Setup ET system with the command line config.
+ throw new IllegalArgumentException("EVIO input file does not exist: " + evioFile.getPath());
+ }
+ }
+ LOGGER.info("EVIO file list was checked!");
+
+ // Print out the configuration for the job to the log.
+ logConfig();
+
+ } catch (final Exception e) { /* Catches errors in command line arguments. */
+ // If there are errors parsing or validating the command line arguments then print usage and exit.
+ LOGGER.log(Level.SEVERE, "Error while processing command line options.", e);
+ usage();
+ }
+
+ EtSystem sys = null;
+ EvioReader reader = null;
+
+ try {
+
+ // Setup ET system from the command line options.
final EtSystemOpenConfig config = new EtSystemOpenConfig(this.etName, this.host, this.port);
- final EtSystem sys = new EtSystem(config, EtConstants.debugInfo);
+ sys = new EtSystem(config, EtConstants.debugInfo);
sys.open();
final EtStation gc = sys.stationNameToObject("GRAND_CENTRAL");
final EtAttachment att = sys.attach(gc);
- // array of events
+ // Array of ET events.
EtEvent[] mevs;
// Loop over input EVIO file list.
for (final File evioFile : this.evioFiles) {
- // Open EVIO reader.
- System.out.println("Opening next EVIO file: " + evioFile.getPath());
- this.reader = new EvioReader(evioFile.getPath(), false);
-
- // Print number of events.
- if (debug) {
- System.out.println("EVIO file opened with " + this.reader.getEventCount() + " events.");
- }
-
- // Ref to current EVIO event.
+ // Open a new EVIO reader.
+ LOGGER.info("Opening next EVIO file " + evioFile.getPath() + " ...");
+ reader = new EvioReader(evioFile.getPath(), false);
+ LOGGER.info("Done opening file!");
+
+ // Print the number of events.
+ LOGGER.info("EVIO file opened with " + reader.getEventCount() + " events.");
+
+ // Reference to the current EVIO event.
EvioEvent event;
- // Event sequence number; starts with 1.
+ // Event sequence number.
int eventCount = 0;
// Loop until event source is exhausted.
- while (true) {
-
- // Get next event.
- event = this.reader.nextEvent();
+ while ((event = reader.nextEvent()) != null) {
+
+ // Increment event count.
++eventCount;
- if (eventCount % 1000 == 0) {
- System.out.println("EvioFileProducer - event <" + eventCount + ">");
- }
- if (event == null) {
- break;
- }
-
- // Try to parse the next event.
+
+ // Print event sequence.
+ if (eventCount % this.count == 0) {
+ LOGGER.info("EVIO event " + eventCount);
+ }
+
try {
- this.reader.parseEvent(event);
- if (debug) {
- System.out.println("event #" + event.getEventNumber() + " is " + event.getTotalBytes()
- + " bytes");
- }
- } catch (final Exception e) {
+ // Parse the next EVIO event.
+ reader.parseEvent(event);
+ LOGGER.finest("EVIO event " + event.getEventNumber() + " is " + event.getTotalBytes()
+ + " bytes.");
+ } catch (final Exception e) { /* Catches parse errors reading the EVIO events. */
e.printStackTrace();
- System.out.println("Error making EVIO event with sequence number <" + eventCount
- + "> in file <" + evioFile.getPath() + ">.");
+ LOGGER.warning("Error making EVIO event with seq number " + eventCount + " in file "
+ + evioFile.getPath());
// Attempt to recover from errors by skipping to next event if there are exceptions.
continue;
}
- if (debug) {
- System.out.println("new events - size=" + this.size + "; group=" + this.group);
- }
+ LOGGER.finest("new events - size=" + this.size + "; group=" + this.group);
final int eventTag = EvioEventUtilities.getEventTag(event);
@@ -296,7 +394,7 @@
Arrays.fill(control, eventTag);
mevs[0].setControl(control);
- // Delay for X millis if applicable.
+ // Apply delay in milliseconds.
if (this.delay > 0) {
Thread.sleep(this.delay);
}
@@ -309,36 +407,43 @@
try {
writer.close();
} catch (final Exception e) {
- System.out.println("Caught exception while closing writer.");
- e.printStackTrace();
+ LOGGER.log(Level.WARNING, "Error while closing writer.", e);
}
mevs[0].setLength(buf.position());
mevs[0].setByteOrder(ByteOrder.nativeOrder());
- if (debug) {
- for (final EtEvent mev : mevs) {
- System.out.println("event length = " + mev.getLength() + ", remaining bytes: "
- + mev.getDataBuffer().remaining());
- }
+
+ for (final EtEvent mev : mevs) {
+ LOGGER.finest("event length = " + mev.getLength() + ", remaining bytes: "
+ + mev.getDataBuffer().remaining());
}
// Put events onto the ET ring.
sys.putEvents(att, mevs);
- if (debug) {
- System.out.println("Wrote event #" + eventCount + " to ET");
- System.out.println("-------------------------------");
- ++eventCount;
- }
- }
-
- this.reader.close();
- }
-
- // Cleanup.
- sys.close();
-
- } catch (final Exception e) {
- throw new RuntimeException(e);
+ LOGGER.finest("Sucessfully wrote " + eventCount + " event to ET which was EVIO event number "
+ + event.getEventNumber() + " from file " + evioFile.getPath() + ".");
+ }
+ reader.close();
+ }
+
+ } catch (final Exception e) { /* Catches all event processing errors. */
+ // This catches and re-throws all errors from processing the EVIO events and configuring the ET system.
+ throw new RuntimeException("Error streaming EVIO events to ET system.", e);
+ } finally {
+ // Cleanup the EVIO reader if needed.
+ if (reader != null && !reader.isClosed()) {
+ try {
+ reader.close();
+ } catch (final IOException e) {
+ LOGGER.log(Level.WARNING, e.getMessage(), e);
+ }
+ }
+ // Cleanup the ET system.
+ if (sys != null && sys.alive()) {
+ sys.close();
+ }
}
+
+ LOGGER.info("Done!");
}
}
|