Author: [log in to unmask] Date: Wed May 20 14:39:52 2015 New Revision: 3006 Log: Fix dumb bugs; add javadoc; other minor restructuring. Added: java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/package-info.java Modified: java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/DateFileFilter.java java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/EpicsLog.java java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/EventTypeLog.java java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/EvioFileCrawler.java java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/EvioFileFilter.java java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/EvioFileList.java java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/EvioFileSequenceComparator.java java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/EvioFileUtilities.java java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/EvioFileVisitor.java java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/JCacheManager.java java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/RunFilter.java java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/RunLog.java java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/RunProcessor.java java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/RunSummary.java Modified: java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/DateFileFilter.java ============================================================================= --- java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/DateFileFilter.java (original) +++ java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/DateFileFilter.java Wed May 20 14:39:52 2015 @@ -7,14 +7,34 @@ import java.nio.file.attribute.BasicFileAttributes; import java.util.Date; +/** + * Filter a file on its creation date. + * <p> + * Files with a creation date after the time stamp will be rejected. + * + * @author Jeremy McCormick + */ final class DateFileFilter implements FileFilter { + /** + * The cut off time stamp. + */ private final Date date; + /** + * Create a filter with the given date as the cut off. + * + * @param date the time stamp cut off + */ DateFileFilter(final Date date) { this.date = date; } + /** + * Return <code>true</code> if the file was created before the time stamp date. + * + * @return <code>true</code> if file was created before the time stamp date + */ @Override public boolean accept(final File pathname) { BasicFileAttributes attr = null; Modified: java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/EpicsLog.java ============================================================================= --- java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/EpicsLog.java (original) +++ java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/EpicsLog.java Wed May 20 14:39:52 2015 @@ -3,26 +3,55 @@ import java.util.HashMap; import java.util.Map; +import org.hps.record.epics.EpicsData; import org.hps.record.epics.EpicsEvioProcessor; -import org.hps.record.epics.EpicsData; import org.hps.record.evio.EvioEventProcessor; import org.jlab.coda.jevio.EvioEvent; -public final class EpicsLog extends EvioEventProcessor { +/** + * Create a summary log of EPICS information found in EVIO events. + * + * @author Jeremy McCormick + */ +final class EpicsLog extends EvioEventProcessor { + /** + * A count of how many times a given EPICS variable is found in the input, e.g. for computing the mean value across the run. + */ private final Map<String, Integer> counts = new HashMap<String, Integer>(); + /** + * The current EPICS data block from the EVIO events (last one that was found). + */ private EpicsData currentData; + /** + * The summary information for the variables from computing the mean across the whole run. + */ private final EpicsData logData = new EpicsData(); + + /** + * The processor for extracting the EPICS information from EVIO events. + */ private final EpicsEvioProcessor processor = new EpicsEvioProcessor(); + /** + * Reference to the run summary which will contain the EPICs information. + */ private final RunSummary runSummary; + /** + * Create an EPICs log pointing to a run summary. + * + * @param runSummary the run summary + */ EpicsLog(final RunSummary runSummary) { this.runSummary = runSummary; } + /** + * End of job hook which computes the mean values for all EPICS variables found in the run. + */ @Override public void endJob() { System.out.println(this.logData); @@ -38,13 +67,21 @@ this.runSummary.setEpicsData(this.logData); } + /** + * Process a single EVIO event, setting the current EPICS data and updating the variable counts. + */ @Override public void process(final EvioEvent evioEvent) { this.processor.process(evioEvent); this.currentData = this.processor.getEpicsScalarData(); - update(); + this.update(); } + /** + * Update state from the current EPICS data. + * <p> + * If the current data is null, this method does nothing. + */ private void update() { if (this.currentData != null) { for (final String name : this.currentData.getUsedNames()) { @@ -59,8 +96,7 @@ this.counts.put(name, count); final double value = this.logData.getValue(name) + this.currentData.getValue(name); this.logData.setValue(name, value); - System.out.println(name + " => added " + this.currentData.getValue(name) + "; total = " + value - + "; mean = " + value / count); + System.out.println(name + " => added " + this.currentData.getValue(name) + "; total = " + value + "; mean = " + value / count); } } } Modified: java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/EventTypeLog.java ============================================================================= --- java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/EventTypeLog.java (original) +++ java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/EventTypeLog.java Wed May 20 14:39:52 2015 @@ -8,11 +8,28 @@ import org.hps.record.evio.EvioEventProcessor; import org.jlab.coda.jevio.EvioEvent; -public class EventTypeLog extends EvioEventProcessor { +/** + * This class makes a log of the number of different event types found in a run by their tag value. + * + * @author Jeremy McCormick + */ +final class EventTypeLog extends EvioEventProcessor { - Map<Object, Integer> eventTypeCounts = new HashMap<Object, Integer>(); - RunSummary runSummary; + /** + * The event tag counts for the run. + */ + private final Map<Object, Integer> eventTypeCounts = new HashMap<Object, Integer>(); + /** + * The run summary to update. + */ + private final RunSummary runSummary; + + /** + * Create the log pointing to a run summary. + * + * @param runSummary the run summary + */ EventTypeLog(final RunSummary runSummary) { this.runSummary = runSummary; for (final EventTagConstant constant : EventTagConstant.values()) { @@ -23,15 +40,28 @@ } } + /** + * End of job hook which sets the event type counts on the run summary. + */ @Override public void endJob() { this.runSummary.setEventTypeCounts(this.eventTypeCounts); } + /** + * Get the counts of different event types (physics events, PRESTART, etc.). + * + * @return a map of event types to their counts + */ Map<Object, Integer> getEventTypeCounts() { return this.eventTypeCounts; } + /** + * Process an EVIO event and add its type to the map. + * + * @param event the EVIO event + */ @Override public void process(final EvioEvent event) { for (final EventTagConstant constant : EventTagConstant.values()) { Modified: java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/EvioFileCrawler.java ============================================================================= --- java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/EvioFileCrawler.java (original) +++ java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/EvioFileCrawler.java Wed May 20 14:39:52 2015 @@ -21,32 +21,51 @@ import org.lcsim.util.log.LogUtil; /** - * Crawls EVIO files in a directory tree, groups the files that are found by run, and optionally performs various tasks - * based on the run summary information that is accumulated, including printing a summary, caching the files from JLAB - * MSS, and updating a run database. + * Crawls EVIO files in a directory tree, groups the files that are found by run, and optionally performs various tasks based on the run summary + * information that is accumulated, including printing a summary, caching the files from JLAB MSS, and updating a run database. * * @author <a href="mailto:[log in to unmask]">Jeremy McCormick</a> */ public final class EvioFileCrawler { - private static final Logger LOGGER = LogUtil.create(EvioFileVisitor.class, new DefaultLogFormatter(), Level.ALL); - + /** + * Setup logger. + */ + private static final Logger LOGGER = LogUtil.create(EvioFileCrawler.class, new DefaultLogFormatter(), Level.ALL); + + /** + * Constant for milliseconds conversion. + */ + private static final long MILLISECONDS = 1000L; + + /** + * Command line options for the crawler. + */ private static final Options OPTIONS = new Options(); + /** + * Statically define the command options. + */ static { OPTIONS.addOption("h", "help", false, "print help and exit"); - OPTIONS.addOption("t", "timestamp-file", true, - "timestamp file for date filtering; modified time will be set at end of job"); + OPTIONS.addOption("t", "timestamp-file", true, "timestamp file for date filtering; modified time will be set at end of job"); OPTIONS.addOption("d", "directory", true, "starting directory"); OPTIONS.addOption("r", "runs", true, "list of runs to accept (others will be excluded)"); OPTIONS.addOption("s", "summary", false, "print run summary at end of job"); OPTIONS.addOption("L", "log-level", true, "set log level (INFO, FINE, etc.)"); OPTIONS.addOption("u", "update", false, "update the run database"); OPTIONS.addOption("e", "epics", false, "process EPICS data"); - OPTIONS.addOption("c", "cache", false, "cache all files from MSS"); - OPTIONS.addOption("w", "wait", true, "total time in ms for file caching"); - } - + OPTIONS.addOption("c", "cache", false, "automatically cache all files from MSS"); + OPTIONS.addOption("w", "wait", true, "total time in seconds to allow for file caching"); + OPTIONS.addOption("m", "max-files", true, "maximum number of files to accept per run (for debugging)"); + OPTIONS.addOption("p", "print", true, "set event printing interval when running EVIO processors"); + } + + /** + * Support running the crawler from the command line. + * + * @param args the command line arguments + */ public static void main(final String[] args) { try { new EvioFileCrawler().parse(args).run(); @@ -55,52 +74,110 @@ } } + /** + * A list of run numbers to accept in the job. + */ private final Set<Integer> acceptRuns = new HashSet<Integer>(); + /** + * The class for managing the file caching using the jcache command. + */ + private final JCacheManager cacheManager = new JCacheManager(); + + /** + * Default event print interval. + */ + private final int DEFAULT_EVENT_PRINT_INTERVAL = 1000; + + /** + * Flag indicating whether EPICS data banks should be processed. + */ private boolean epics = false; + /** + * Interval for printing out event number while running EVIO processors. + */ + private int eventPrintInterval = DEFAULT_EVENT_PRINT_INTERVAL; + + /** + * The maximum number of files to + */ + private int maxFiles = -1; + + /** + * The options parser. + */ private final PosixParser parser = new PosixParser(); + /** + * Flag indicating whether the run summaries should be printed (may result in some extra file processing). + */ private boolean printSummary = false; + /** + * The root directory to crawl which defaults to the current directory. + */ private File rootDir = new File(System.getProperty("user.dir")); + /** + * A timestamp to use for filtering input files on their creation date. + */ private Date timestamp = null; + /** + * A file to use for the timestamp date. + */ private File timestampFile = null; + /** + * Flag indicating if the run database should be updated from results of the job. + */ private boolean update = false; - - private boolean cache = false; - - private Long waitTime; - + + /** + * Flag indicating if the file cache should be used (e.g. jcache automatically executed to move files to the cache disk from tape). + */ + private boolean useFileCache = false; + + /** + * The maximum wait time in milliseconds to allow for file caching operations. + */ + private Long waitTime; + + /** + * Create the processor for a single run. + * + * @param runSummary the run summary for the run + * @return the run processor + */ private RunProcessor createRunProcessor(final RunSummary runSummary) { - final RunProcessor processor = new RunProcessor(runSummary); + final RunProcessor processor = new RunProcessor(runSummary, this.cacheManager); if (this.epics) { processor.addProcessor(new EpicsLog(runSummary)); } if (this.printSummary) { processor.addProcessor(new EventTypeLog(runSummary)); } + if (this.maxFiles != -1) { + processor.setMaxFiles(this.maxFiles); + } + processor.useFileCache(this.useFileCache); + processor.setEventPrintInterval(this.eventPrintInterval); return processor; } - - /** - * Print the usage statement for this tool to the console. - */ - private final void printUsage() { - final HelpFormatter help = new HelpFormatter(); - help.printHelp("EvioFileCrawler", "", OPTIONS, ""); - System.exit(0); - } - + + /** + * Parse command line options, but do not start the job. + * + * @param args the command line arguments + * @return the configured crawler object + */ private EvioFileCrawler parse(final String args[]) { try { final CommandLine cl = this.parser.parse(OPTIONS, args); - + if (cl.hasOption("h")) { - printUsage(); + this.printUsage(); } if (cl.hasOption("L")) { @@ -122,12 +199,10 @@ if (cl.hasOption("t")) { this.timestampFile = new File(cl.getOptionValue("t")); if (!this.timestampFile.exists()) { - throw new IllegalArgumentException("The timestamp file does not exist: " - + this.timestampFile.getPath()); + throw new IllegalArgumentException("The timestamp file does not exist: " + this.timestampFile.getPath()); } try { - this.timestamp = new Date(Files - .readAttributes(this.timestampFile.toPath(), BasicFileAttributes.class).lastModifiedTime() + this.timestamp = new Date(Files.readAttributes(this.timestampFile.toPath(), BasicFileAttributes.class).lastModifiedTime() .toMillis()); } catch (final IOException e) { throw new RuntimeException("Error getting attributes of timestamp file.", e); @@ -149,17 +224,28 @@ if (cl.hasOption("u")) { this.update = true; } - + if (cl.hasOption("e")) { this.epics = true; } - + if (cl.hasOption("c")) { - this.cache = true; - } - + this.useFileCache = true; + } + if (cl.hasOption("w")) { - this.waitTime = Long.parseLong(cl.getOptionValue("w")); + this.waitTime = Long.parseLong(cl.getOptionValue("w")) * MILLISECONDS; + if (this.waitTime > 0L) { + this.cacheManager.setWaitTime(this.waitTime); + } + } + + if (cl.hasOption("m")) { + this.maxFiles = Integer.parseInt(cl.getOptionValue("m")); + } + + if (cl.hasOption("p")) { + this.eventPrintInterval = Integer.parseInt(cl.getOptionValue("p")); } } catch (final ParseException e) { @@ -169,63 +255,54 @@ return this; } - private void cacheFiles(final RunLog runs) { - - JCacheManager cache = new JCacheManager(); - - if (this.waitTime != null) { - LOGGER.config("set jcache wait time to " + waitTime + " millis"); - cache.setWaitTime(waitTime); - } - - // Process all files in the runs. + /** + * Print the usage statement for this tool to the console. + */ + private void printUsage() { + final HelpFormatter help = new HelpFormatter(); + help.printHelp("EvioFileCrawler", "", OPTIONS, ""); + System.exit(0); + } + + /** + * Process a single run. + * + * @param runSummary the run summary information + * @throws Exception if there is some error while running the file processing + */ + private void processRun(final RunSummary runSummary) throws Exception { + + // Clear the cache manager. + this.cacheManager.clear(); + + // Create a processor to process all the EVIO events in the run. + final RunProcessor processor = this.createRunProcessor(runSummary); + + // Process all of the runs files. + processor.process(); + } + + /** + * Process all runs that were found. + * + * @param runs the run log containing the list of run summaries + * @throws Exception if there is an error processing one of the runs + */ + private void processRuns(final RunLog runs) throws Exception { + // Process all of the runs that were found. for (final int run : runs.getSortedRunNumbers()) { - - LOGGER.info("processing run " + run + " files ..."); - - // Get the run summary for the run. - final RunSummary runSummary = runs.getRunSummary(run); - - // Cache all the files. - LOGGER.info("caching " + runSummary.getFiles().size() + " files ..."); - cache.cache(runSummary.getFiles()); - - // Wait for cache operation to complete. - boolean cached = cache.waitForAll(); - - LOGGER.info("files were cached: " + cached); - - if (!cached) { - throw new RuntimeException("The cache operation did not complete in time."); - } - - LOGGER.info("done caching run " + run); - } - } - - private void processRuns(final RunLog runs) throws Exception { - - // Process all files in the runs. - for (final int run : runs.getSortedRunNumbers()) { - - LOGGER.info("processing run " + run + " ..."); - - // Get the run summary for the run. - final RunSummary runSummary = runs.getRunSummary(run); - - // Create a processor to process all the EVIO records in the run. - final RunProcessor processor = createRunProcessor(runSummary); - - // Process the run, updating the run summary. - processor.process(); - - LOGGER.info("done processing run " + run); - } - } - + this.processRun(runs.getRunSummary(run)); + } + } + + /** + * Run the crawler job (generally will take a long time!). + * + * @throws Exception if there is some error during the job + */ public void run() throws Exception { final EnumSet<FileVisitOption> options = EnumSet.noneOf(FileVisitOption.class); - final EvioFileVisitor visitor = new EvioFileVisitor(); + final EvioFileVisitor visitor = new EvioFileVisitor(this.timestamp); if (this.timestamp != null) { visitor.addFilter(new DateFileFilter(this.timestamp)); LOGGER.config("added date filter with timestamp " + this.timestamp); @@ -242,25 +319,20 @@ } final RunLog runs = visitor.getRunLog(); - + // Print run numbers that were found. - StringBuffer sb = new StringBuffer(); - for (Integer run : runs.getSortedRunNumbers()) { + final StringBuffer sb = new StringBuffer(); + for (final Integer run : runs.getSortedRunNumbers()) { sb.append(run + " "); } LOGGER.info("found files from runs: " + sb.toString()); - + // Sort files on their sequence numbers. LOGGER.fine("sorting files by sequence ..."); runs.sortAllFiles(); - - // Cache all the files to disk before processing them. - if (this.cache) { - cacheFiles(runs); - } - - // Process all the files in the runs. - processRuns(runs); + + // Process all the files in all of runs. This will perform caching from MSS if necessary. + this.processRuns(runs); // Print the run summary information. if (this.printSummary) { @@ -273,7 +345,7 @@ runs.insert(); } - // Update the timestamp file. + // Update the timestamp file which can be used to tell which files have been processed. if (this.timestampFile == null) { this.timestampFile = new File("timestamp"); try { Modified: java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/EvioFileFilter.java ============================================================================= --- java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/EvioFileFilter.java (original) +++ java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/EvioFileFilter.java Wed May 20 14:39:52 2015 @@ -3,16 +3,27 @@ import java.io.File; import java.io.FileFilter; +/** + * This is a simple file filter that will accept EVIO files with a certain convention to their naming which looks like <i>FILENAME.evio.SEQUENCE</i>. + * This matches the convention used by the CODA DAQ software. + * + * @author Jeremy McCormick + */ final class EvioFileFilter implements FileFilter { - @Override + /** + * Return <code>true</code> if file is an EVIO file with correct file name convention. + * + * @return <code>true</code> if file is an EVIO file with correct file name convention + */ + @Override public boolean accept(final File pathname) { - boolean isEvio = pathname.getName().contains(".evio"); + final boolean isEvio = pathname.getName().contains(".evio"); boolean hasSeqNum = false; try { EvioFileUtilities.getSequenceNumber(pathname); hasSeqNum = true; - } catch (Exception e) { + } catch (final Exception e) { } return isEvio && hasSeqNum; } Modified: java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/EvioFileList.java ============================================================================= --- java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/EvioFileList.java (original) +++ java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/EvioFileList.java Wed May 20 14:39:52 2015 @@ -1,7 +1,6 @@ package org.hps.record.evio.crawler; import java.io.File; -import java.io.IOException; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; @@ -12,41 +11,42 @@ import java.util.Map; import java.util.logging.Logger; -import org.jlab.coda.jevio.EvioException; -import org.jlab.coda.jevio.EvioReader; import org.lcsim.util.log.LogUtil; +/** + * This is a list of <code>File</code> objects that are assumed to be EVIO files. There are some added utilities for getting the total number of + * events in all the files. + * + * @author Jeremy McCormick + */ final class EvioFileList extends ArrayList<File> { + /** + * Setup logger. + */ private static final Logger LOGGER = LogUtil.create(EvioFileList.class); - Map<File, Integer> eventCounts = new HashMap<File, Integer>(); + /** + * Event count by file. + */ + private final Map<File, Integer> eventCounts = new HashMap<File, Integer>(); - void computeEventCount(EvioReader reader, final File file) throws IOException, EvioException { - if (!reader.getPath().equals(file.getPath())) { - throw new IllegalArgumentException("The EvioReader and file paths do not match."); - } - LOGGER.info("computing event count for " + file.getPath() + " ..."); - int eventCount = reader.getEventCount(); - this.eventCounts.put(file, eventCount); - LOGGER.info("done computing event count for " + file.getPath()); - } - - int computeTotalEvents() { - LOGGER.info("computing total events ..."); - int totalEvents = 0; - for (final File file : this) { - getEventCount(file); - totalEvents += this.eventCounts.get(file); - } - LOGGER.info("done computing total events"); - return totalEvents; - } - + /** + * Get the first file. + * + * @return the first file + */ File first() { return this.get(0); } + /** + * Get the event count for an EVIO file. + * + * @param file the EVIO file + * @return the event count for the file + * @throws RuntimeException if the count was never computed (file is not in map) + */ int getEventCount(final File file) { if (this.eventCounts.get(file) == null) { throw new RuntimeException("The event count for " + file.getPath() + " was never computed."); @@ -54,11 +54,38 @@ return this.eventCounts.get(file); } + /** + * Get the total number of events. + * <p> + * Files which do not have their event counts computed will be ignored. + * + * @return the total number of events + */ + int getTotalEvents() { + int totalEvents = 0; + for (final File file : this) { + if (this.eventCounts.containsKey(file)) { + totalEvents += this.eventCounts.get(file); + } else { + // Warn about non-computed count. + // FIXME: Perhaps this should actually be a fatal error. + LOGGER.warning("event count for " + file.getPath() + " was not computed and will not be reflected in total"); + } + } + return totalEvents; + } + + /** + * Insert the file names into the run database. + * + * @param connection the database connection + * @param run the run number + * @throws SQLException if there is a problem executing one of the database queries + */ void insert(final Connection connection, final int run) throws SQLException { LOGGER.info("updating file list ..."); PreparedStatement filesStatement = null; - filesStatement = connection - .prepareStatement("INSERT INTO run_log_files (run, directory, name) VALUES(?, ?, ?)"); + filesStatement = connection.prepareStatement("INSERT INTO run_log_files (run, directory, name) VALUES(?, ?, ?)"); LOGGER.info("inserting files from run " + run + " into database"); for (final File file : this) { LOGGER.info("creating update statement for " + file.getPath()); @@ -71,10 +98,28 @@ LOGGER.info("run_log_files was updated!"); } + /** + * Get the last file. + * + * @return the last file + */ File last() { return this.get(this.size() - 1); } + /** + * Set the event count for a file. + * + * @param file the EVIO file + * @param eventCount the event count + */ + void setEventCount(final File file, final Integer eventCount) { + this.eventCounts.put(file, eventCount); + } + + /** + * Sort the files in-place by their sequence number. + */ void sort() { final List<File> fileList = new ArrayList<File>(this); Collections.sort(fileList, new EvioFileSequenceComparator()); Modified: java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/EvioFileSequenceComparator.java ============================================================================= --- java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/EvioFileSequenceComparator.java (original) +++ java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/EvioFileSequenceComparator.java Wed May 20 14:39:52 2015 @@ -3,8 +3,18 @@ import java.io.File; import java.util.Comparator; +/** + * Compare two EVIO files by their sequence numbers. + * + * @author Jeremy McCormick + */ final class EvioFileSequenceComparator implements Comparator<File> { + /** + * Compare two EVIO files by their sequence numbers. + * + * @return -1 if the first file's sequence number is less than the second's; 0 if equal; 1 if greater than + */ @Override public int compare(final File o1, final File o2) { final Integer sequenceNumber1 = EvioFileUtilities.getSequenceNumber(o1); Modified: java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/EvioFileUtilities.java ============================================================================= --- java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/EvioFileUtilities.java (original) +++ java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/EvioFileUtilities.java Wed May 20 14:39:52 2015 @@ -13,12 +13,48 @@ import org.jlab.coda.jevio.EvioReader; import org.lcsim.util.log.LogUtil; -public final class EvioFileUtilities { - +/** + * A miscellaneous collection of EVIO file utility methods used by classes in the crawler package. + * + * @author Jeremy McCormick + */ +final class EvioFileUtilities { + + /** + * Setup logger. + */ private static final Logger LOGGER = LogUtil.create(EvioFileUtilities.class); + /** + * Milliseconds constant for conversion to/from second. + */ private static final long MILLISECONDS = 1000L; + /** + * Get a cached file path assuming that the input file is on the JLAB MSS. + * + * @param file the MSS file path + * @return the cached file path + * @throws IllegalArgumentException if the file is not on the MSS (e.g. path does not start with "/mss") + */ + static File getCachedFile(final File file) { + if (!isMssFile(file)) { + throw new IllegalArgumentException("File " + file.getPath() + " is not on the JLab MSS."); + } + if (isCachedFile(file)) { + throw new IllegalArgumentException("File " + file.getPath() + " is already on the cache disk."); + } + return new File("/cache" + file.getPath()); + } + + /** + * Get the date from the control bank of an EVIO event. + * + * @param file the EVIO file + * @param eventTag the event tag on the bank + * @param gotoEvent an event to start the scanning + * @return the control bank date or null if not found + */ static Date getControlDate(final File file, final int eventTag, final int gotoEvent) { Date date = null; EvioReader reader = null; @@ -52,6 +88,12 @@ return date; } + /** + * Get the date from the head bank. + * + * @param event the EVIO file + * @return the date from the head bank or null if not found + */ static Date getHeadBankDate(final EvioEvent event) { Date date = null; final BaseStructure headBank = EvioEventUtilities.getHeadBank(event); @@ -65,6 +107,12 @@ return date; } + /** + * Get the run end date which is taken either from the END event or the last physics event is the END event is not found. + * + * @param file the EVIO file + * @return the run end date + */ static Date getRunEnd(final File file) { Date date = getControlDate(file, EvioEventConstants.END_EVENT_TAG, -10); if (date == null) { @@ -95,6 +143,13 @@ return date; } + /** + * Get the run number from the file name. + * + * @param file the EVIO file + * @return the run number + * @throws Exception if there is a problem parsing out the run number + */ static Integer getRunFromName(final File file) { final String name = file.getName(); final int startIndex = name.lastIndexOf("_") + 1; @@ -102,6 +157,14 @@ return Integer.parseInt(name.substring(startIndex, endIndex)); } + /** + * Get the run start date from an EVIO file (should be the first in the run). + * <p> + * This is taken from the PRESTART event. + * + * @param file the EVIO file + * @return the run start date + */ static Date getRunStart(final File file) { Date date = getControlDate(file, EvioEventConstants.PRESTART_EVENT_TAG, 0); if (date == null) { @@ -131,46 +194,80 @@ return date; } + /** + * Get the EVIO file sequence number (the number at the end of the file name). + * + * @param file the EVIO file + * @return the file's sequence number + * @throws Exception if there is an error parsing out the sequence number + */ static Integer getSequenceNumber(final File file) { final String name = file.getName(); return Integer.parseInt(name.substring(name.lastIndexOf(".") + 1)); } + /** + * Return <code>true</code> if this is a cached file e.g. the path starts with "/cache". + * + * @param file the file + * @return <code>true</code> if the file is a cached file + */ + static boolean isCachedFile(final File file) { + return file.getPath().startsWith("/cache"); + } + + /** + * Return <code>true</code> if this file is on the JLAB MSS e.g. the path starts with "/mss". + * + * @param file the file + * @return <code>true</code> if the file is on the MSS + */ + static boolean isMssFile(final File file) { + return file.getPath().startsWith("/mss"); + } + + /** + * Open an EVIO file using a <code>EvioReader</code>. + * + * @param file the EVIO file + * @return the new <code>EvioReader</code> for the file + * @throws IOException if there is an IO problem + * @throws EvioException if there is an error reading the EVIO data + */ static EvioReader open(final File file) throws IOException, EvioException { return open(file, false); } - - static EvioReader open(final File file, boolean sequential) throws IOException, EvioException { + + /** + * Open an EVIO file, using the cached file path if necessary. + * + * @param file the EVIO file + * @param sequential <code>true</code> to enable sequential reading + * @return the new <code>EvioReader</code> for the file + * @throws IOException if there is an IO problem + * @throws EvioException if there is an error reading the EVIO data + */ + static EvioReader open(final File file, final boolean sequential) throws IOException, EvioException { File openFile = file; if (isMssFile(file)) { openFile = getCachedFile(file); - } + } final long start = System.currentTimeMillis(); final EvioReader reader = new EvioReader(openFile, false, sequential); final long end = System.currentTimeMillis() - start; LOGGER.info("opened " + openFile.getPath() + " in " + end / MILLISECONDS + " seconds"); return reader; } - + + /** + * Open an EVIO from a path. + * + * @param path the file path + * @return the new <code>EvioReader</code> for the file + * @throws IOException if there is an IO problem + * @throws EvioException if there is an error reading the EVIO data + */ static EvioReader open(final String path) throws IOException, EvioException { return open(new File(path)); - } - - static File getCachedFile(File file) { - if (!isMssFile(file)) { - throw new IllegalArgumentException("File " + file.getPath() + " is not on the JLab MSS."); - } - if (isCachedFile(file)) { - throw new IllegalArgumentException("File " + file.getPath() + " is already on the cache disk."); - } - return new File("/cache" + file.getPath()); - } - - static boolean isMssFile(File file) { - return file.getPath().startsWith("/mss"); - } - - static boolean isCachedFile(File file) { - return file.getPath().startsWith("/cache"); } } Modified: java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/EvioFileVisitor.java ============================================================================= --- java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/EvioFileVisitor.java (original) +++ java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/EvioFileVisitor.java Wed May 20 14:39:52 2015 @@ -7,60 +7,112 @@ import java.nio.file.SimpleFileVisitor; import java.nio.file.attribute.BasicFileAttributes; import java.util.ArrayList; +import java.util.Date; import java.util.List; import java.util.logging.Level; import java.util.logging.Logger; +import org.lcsim.util.log.DefaultLogFormatter; import org.lcsim.util.log.LogUtil; +/** + * A file visitor that crawls directories looking for EVIO files. + * + * @author Jeremy McCormick + */ final class EvioFileVisitor extends SimpleFileVisitor<Path> { - private static final Logger LOGGER = LogUtil.create(EvioFileVisitor.class); + /** + * Setup logger. + */ + private static final Logger LOGGER = LogUtil.create(EvioFileVisitor.class, new DefaultLogFormatter(), Level.FINE); + /** + * A list of file filters to apply. + */ private final List<FileFilter> filters = new ArrayList<FileFilter>(); + /** + * The run log containing information about files from each run. + */ private final RunLog runs = new RunLog(); - EvioFileVisitor() { - addFilter(new EvioFileFilter()); + /** + * Create a new file visitor. + * + * @param timestamp the timestamp which is used for date filtering + */ + EvioFileVisitor(final Date timestamp) { + this.addFilter(new EvioFileFilter()); + if (timestamp != null) { + // Add date filter if timestamp is supplied. + this.addFilter(new DateFileFilter(timestamp)); + } } + /** + * Run the filters on the file to tell whether it should be accepted or not. + * + * @param file the EVIO file + * @return <code>true</code> if file should be accepted + */ private boolean accept(final File file) { boolean accept = true; for (final FileFilter filter : this.filters) { accept = filter.accept(file); if (accept == false) { - LOGGER.fine(filter.getClass().getSimpleName() + " rejected file: " + file.getPath()); + LOGGER.finer(filter.getClass().getSimpleName() + " rejected " + file.getPath()); break; } } return accept; } + /** + * Add a file filter. + * + * @param filter the file filter + */ void addFilter(final FileFilter filter) { this.filters.add(filter); - LOGGER.config("added filter: " + filter.getClass().getSimpleName()); + LOGGER.config("added " + filter.getClass().getSimpleName() + " filter"); } + /** + * Get the run log. + * + * @return the run log + */ RunLog getRunLog() { return this.runs; } + /** + * Visit a single file. + * + * @param path the file to visit + * @param attrs the file attributes + */ @Override public FileVisitResult visitFile(final Path path, final BasicFileAttributes attrs) { + final File file = path.toFile(); + if (this.accept(file)) { - final File file = path.toFile(); - if (accept(file)) { + // Get the run number from the file name. + final Integer run = EvioFileUtilities.getRunFromName(file); - final Integer run = EvioFileUtilities.getRunFromName(file); + // Get the sequence number from the file name. final Integer seq = EvioFileUtilities.getSequenceNumber(file); - LOGGER.info("adding file: " + file.getPath() + "; run: " + run + "; seq = " + seq); + LOGGER.info("accepted file " + file.getPath() + " with run " + run + " and seq " + seq); + // Add this file to the file list for the run. this.runs.getRunSummary(run).addFile(file); } else { - LOGGER.info("rejected file: " + file.getPath()); + // File was rejected by one of the filters. + LOGGER.finer("rejected file " + file.getPath()); } + // Always continue crawling. return FileVisitResult.CONTINUE; } } Modified: java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/JCacheManager.java ============================================================================= --- java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/JCacheManager.java (original) +++ java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/JCacheManager.java Wed May 20 14:39:52 2015 @@ -21,109 +21,420 @@ /** * Utility class for using the <i>jcache</i> command at JLAB. + * + * @author Jeremy McCormick */ -public class JCacheManager { - - private static Logger LOGGER = LogUtil.create(JCacheManager.class, new DefaultLogFormatter(), Level.ALL); - - private Map<File, FileInfo> fileInfos = new HashMap<File, FileInfo>(); - - private static long DEFAULT_MAX_WAIT_TIME = 300000; - - private long maxWaitTime = DEFAULT_MAX_WAIT_TIME; - - private static final long POLL_WAIT_TIME = 10000; - - static class FileInfo { - +final class JCacheManager { + + /** + * Keeps track of cache status for a single file. + * + * @author Jeremy McCormick + */ + static class CacheStatus { + + /** + * Flag indicating if file is cached yet. + */ + private boolean cached = false; + + /** + * Path to the file on the MSS (not the cached path). + */ + private File file = null; + + /** + * The request ID from the 'jcache submit' command. + */ private Integer requestId = null; - private File file = null; - private boolean cached = false; - - FileInfo(File file, Integer requestId) { + + /** + * The current status from executing the 'jcache request' command. + */ + private String status; + + /** + * Create a new <code>CacheStatus</code> object. + * + * @param file the file which has the MSS path + * @param requestId the request ID from running the cache command + */ + CacheStatus(final File file, final Integer requestId) { + this.file = file; this.requestId = requestId; } - File getCachedFile() { - return new File("/cache" + file.getPath()); - } - + /** + * Get the file (path on MSS). + * + * @return the file with path on the MSS + */ + File getFile() { + return this.file; + } + + /** + * Get the request ID. + * + * @return the request ID + */ Integer getRequestId() { - return requestId; - } - + return this.requestId; + } + + /** + * Get the request XML from running 'jcache request' to get the status. + * + * @param is the input stream from the process + * @return the request status string + */ + private Element getRequestXml(final InputStream is) { + String xmlString = null; + try { + xmlString = readFully(is, "US-ASCII"); + } catch (final IOException e) { + throw new RuntimeException(e); + } + // LOGGER.finer("raw XML: " + xmlString); + xmlString = xmlString.substring(xmlString.trim().indexOf("<?xml") + 1); + // LOGGER.finer("cleaned XML: " + xmlString); + return buildDocument(xmlString).getRootElement(); + } + + /** + * Get the status string. + * + * @param updateStatus <code>true</code> to run the 'jcache status' command + * @return the status string + */ + String getStatus(final boolean updateStatus) { + if (updateStatus) { + this.update(); + } + return this.status; + } + + /** + * Return <code>true</code> if file is cached. + * + * @return <code>true</code> if file is cached + */ boolean isCached() { - return cached; - } - - void update() { - if (!isCached()) { - if (!isPending() && getCachedFile().exists()) { - LOGGER.info("file " + file.getPath() + " is cached"); - this.cached = true; - } - } - } - - String getStatus() { + return this.cached; + } + + /** + * Return </code>true</code> if status is "done". + * + * @return </code>true</code> if status is "done" + */ + boolean isDone() { + return "done".equals(this.status); + } + + /** + * Return </code>true</code> if status is "hit". + * + * @return </code>true</code> if status is "hit" + */ + boolean isHit() { + return "hit".equals(this.status); + } + + /** + * Return </code>true</code> if status is "pending". + * + * @return </code>true</code> if status is "pending" + */ + boolean isPending() { + return "pending".equals(this.status); + } + + /** + * Request the file status string using the 'jcache request' command. + * + * @return the file status string + */ + private String requestFileStatus() { Process process = null; try { - process = new ProcessBuilder("jcache", "request", requestId.toString()).start(); + process = new ProcessBuilder(JCACHE_COMMAND, "request", this.requestId.toString()).start(); } catch (final IOException e) { throw new RuntimeException(e); - } + } int status = 0; try { status = process.waitFor(); - } catch (InterruptedException e) { - throw new RuntimeException("Process was interrupted.", e); + } catch (final InterruptedException e) { + throw new RuntimeException("Cache process was interrupted.", e); } if (status != 0) { - throw new RuntimeException("The jcache request process returned a non-zero exit status: " + status); - } - return getRequestXml(process.getInputStream()).getChild("request").getChildText("status"); - } - - private Element getRequestXml(InputStream is) { - String xmlString = null; - try { - xmlString = readFully(is, "US-ASCII"); - } catch (IOException e) { - throw new RuntimeException(e); - } - xmlString = xmlString.substring(xmlString.trim().indexOf("<?xml")); - LOGGER.info(xmlString); - return buildDocument(xmlString).getRootElement(); - } - - boolean isPending() { - return !"pending".equals(getStatus()); - } - - boolean isDone() { - return "done".equals(getStatus()); - } - - boolean isHit() { - return "hit".equals(getStatus()); - } - } - - void setWaitTime(long maxWaitTime) { - this.maxWaitTime = maxWaitTime; - } - - void cache(List<File> files) { - for (File file : files) { - cache(file); - } - } - - void cache(File file) { + throw new RuntimeException("The jcache request returned an error status: " + status); + } + return this.getRequestXml(process.getInputStream()).getChild("request").getChild("file").getChildText("status"); + } + + /** + * Update the cache status. + */ + void update() { + this.status = this.requestFileStatus(); + if (this.isDone() || this.isHit()) { + this.cached = true; + } + } + } + + /** + * The default max wait time in milliseconds for all file caching operations to complete (default is ~5 minutes). + */ + private static long DEFAULT_MAX_WAIT_TIME = 300000; + + /** + * The command for running jcache (accessible from ifarm machines and batch notes). + */ + private static final String JCACHE_COMMAND = "/site/bin/jcache"; + + /** + * Setup the logger. + */ + private static Logger LOGGER = LogUtil.create(JCacheManager.class, new DefaultLogFormatter(), Level.FINE); + + /** + * Time to wait between polling of all files (~10 seconds). + */ + private static final long POLL_WAIT_TIME = 10000; + + /** + * Build an XML document from a string. + * + * @param xmlString the raw XML string + * @return the XML document + */ + private static Document buildDocument(final String xmlString) { + final SAXBuilder builder = new SAXBuilder(); + Document document = null; + try { + document = builder.build(new InputSource(new StringReader(xmlString))); + } catch (final Exception e) { + throw new RuntimeException("Error building XML doc.", e); + } + return document; + } + + /** + * Read bytes from an input stream into an array. + * + * @param inputStream the input stream + * @return the bytes read + * @throws IOException if there is a problem reading the stream + */ + private static byte[] readFully(final InputStream inputStream) throws IOException { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final byte[] buffer = new byte[1024]; + int length = 0; + while ((length = inputStream.read(buffer)) != -1) { + baos.write(buffer, 0, length); + } + return baos.toByteArray(); + } + + /** + * Read bytes from an input stream into a string with a certain encoding. + * + * @param inputStream the input stream + * @param encoding the encoding + * @return the output string + * @throws IOException if there is a problem reading from the stream + */ + private static String readFully(final InputStream inputStream, final String encoding) throws IOException { + return new String(readFully(inputStream), encoding); + } + + /** + * The current cache statuses mapped by <code>File</code> object. + */ + private final Map<File, CacheStatus> cacheStatuses = new HashMap<File, CacheStatus>(); + + /** + * The maximum time to wait for all caching operations to complete. + */ + private long maxWaitTime = DEFAULT_MAX_WAIT_TIME; + + /** + * The time when the caching operation starts. + */ + long start = 0; + + /** + * Cache a file by submitting a 'jcache submit' process. + * <p> + * The resulting cache request will be registered with this manager until the {@link #clear()} method is called. + * + * @param file the file to cache which should be a path on the JLAB MSS (e.g. starts with '/mss') + */ + private void cache(final File file) { if (!EvioFileUtilities.isMssFile(file)) { LOGGER.severe("file " + file.getPath() + " is not on the MSS"); throw new IllegalArgumentException("Only files on the MSS can be cached."); } + + if (EvioFileUtilities.getCachedFile(file).exists()) { + // Assume (maybe unreasonably?!) that since the file already exists it will stay in the cache for the duration of the job. + LOGGER.fine(file.getPath() + " is already on the cache disk so cache request is ignored"); + } else { + + // Execute the submit process. + final Process process = this.submit(file); + + // Parse out the request ID from the process output. + final Integer requestId = this.getRequestId(process); + + // Register the request with the manager. + final CacheStatus cacheStatus = new CacheStatus(file, requestId); + this.cacheStatuses.put(file, cacheStatus); + + LOGGER.info("jcache submitted for " + file.getPath() + " with req ID '" + requestId + "'"); + } + } + + /** + * Submit cache request for every file in a list. + * + * @param files + */ + void cache(final List<File> files) { + for (final File file : files) { + this.cache(file); + } + } + + /** + * Return <code>true</code> if all files registered with the manager are cached. + * + * @return <code>true</code> if all files registered with the manager are cached + */ + boolean checkCacheStatus() { + + // Flag which will be changed to false if we find non-cached files in the loop. + boolean allCached = true; + + // Loop over all cache statuses and refresh/check them. + for (final Entry<File, CacheStatus> entry : this.cacheStatuses.entrySet()) { + + // Get the cache status for a single file. + final CacheStatus cacheStatus = entry.getValue(); + + LOGGER.info("checking status of " + cacheStatus.getFile().getPath() + " with req ID '" + cacheStatus.getRequestId() + "' ..."); + + // Is this file flagged as not non-cached? + if (!cacheStatus.isCached()) { + + LOGGER.info("updating status of " + cacheStatus.getFile().getPath() + " ..."); + + // Update the cache status to see if it changed since last check. + cacheStatus.update(); + + // Is status still non-cached after status update? + if (!cacheStatus.isCached()) { + + // Set flag which indicates at least one file is not cached yet. + allCached = false; + + LOGGER.info(entry.getKey() + " is NOT cached with status " + cacheStatus.getStatus(false)); + } else { + // Log that this file is now cached. It will not be checked next time. + LOGGER.info(cacheStatus.getFile().getPath() + " is cached with status " + cacheStatus.getStatus(false)); + } + } else { + LOGGER.info(cacheStatus.getFile().getPath() + " is already cached"); + } + } + return allCached; + } + + /** + * Clear all cache statuses. + */ + void clear() { + this.cacheStatuses.clear(); + this.start = 0; + LOGGER.info("CacheManager state was cleared."); + } + + /** + * Get the request ID from a process that ran the 'jcache request' command. + * + * @param process the system process + * @return the request ID + */ + private Integer getRequestId(final Process process) { + String output = null; + try { + output = readFully(process.getInputStream(), "US-ASCII"); + } catch (final IOException e) { + throw new RuntimeException(e); + } + return Integer.parseInt(output.substring(output.indexOf("'") + 1, output.lastIndexOf("'"))); + } + + /** + * Get the number of files that are not cached. + * + * @return the number of files that are not cached + */ + int getUncachedCount() { + int nUncached = 0; + for (final Entry<File, CacheStatus> entry : this.cacheStatuses.entrySet()) { + if (!entry.getValue().isCached()) { + nUncached += 1; + } + } + return nUncached; + } + + /** + * Set the maximum wait time for caching to complete. + * + * @param maxWaitTime the maximum wait time for caching to complete + */ + void setWaitTime(final long maxWaitTime) { + this.maxWaitTime = maxWaitTime; + LOGGER.config("max wait time set to " + maxWaitTime + " ms"); + } + + /** + * Sleep after checking cache statuses. + * + * @return <code>true</code> if <code>maxWaitTime</code> is exceeded or method is interrupted + */ + private boolean sleep() { + final long elapsed = System.currentTimeMillis() - this.start; + LOGGER.info("elapsed time is " + elapsed + " ms"); + if (elapsed > this.maxWaitTime) { + LOGGER.warning("max wait time of " + this.maxWaitTime + " ms was exceeded while caching files"); + return true; + } + final Object lock = new Object(); + synchronized (lock) { + try { + LOGGER.info("waiting " + POLL_WAIT_TIME + " ms before checking cache again ..."); + lock.wait(POLL_WAIT_TIME); + } catch (final InterruptedException e) { + e.printStackTrace(); + return true; + } + } + return false; + } + + /** + * Submit a cache request for a file. + * + * @param file the file + * @return the system process for the cache request command + */ + private Process submit(final File file) { Process process = null; try { process = new ProcessBuilder("jcache", "submit", "default", file.getPath()).start(); @@ -133,88 +444,62 @@ int status = 0; try { status = process.waitFor(); - } catch (InterruptedException e) { + } catch (final InterruptedException e) { throw new RuntimeException("Process was interrupted.", e); } if (status != 0) { - throw new RuntimeException("The jcache process returned a non-zero exit status: " + status); - } - String output = null; - try { - output = readFully(process.getInputStream(), "US-ASCII"); - } catch (IOException e) { - throw new RuntimeException(e); - } - Integer requestId = Integer.parseInt(output.substring(output.indexOf("'") + 1, output.lastIndexOf("'"))); - FileInfo fileInfo = new FileInfo(file, requestId); - fileInfos.put(file, fileInfo); - LOGGER.info("jcache submitted for " + file.getPath() + " with req ID '" + requestId + "'"); - } - - private static Document buildDocument(String xmlString) { - SAXBuilder builder = new SAXBuilder(); - Document document = null; - try { - builder.build(new InputSource(new StringReader(xmlString))); - } catch (Exception e) { - e.printStackTrace(); - } - return document; - } - - boolean waitForAll() { - if (this.fileInfos.isEmpty()) { - throw new IllegalStateException("There are no files being cached."); - } + throw new RuntimeException("The jcache process returned an error status of " + status); + } + return process; + } + + /** + * Wait for all files registered with the manager to be cached or until a timeout occurs. + * + * @return <code>true</code> if all files are successfully cached + */ + boolean waitForCache() { + LOGGER.info("waiting for files to be cached ..."); - long elapsed = 0; + + if (this.cacheStatuses.isEmpty()) { + throw new IllegalStateException("There are no files registered with the cache manager."); + } + + // This is the return value which will be changed to true if all files are cached successfully. boolean cached = false; + + // Get the start time so we can calculate later if max wait time is exceeded. + this.start = System.currentTimeMillis(); + + // Keep checking files until they are all cached or the max wait time is exceeded. while (!cached) { - boolean check = true; - INFO_LOOP: for (Entry<File, FileInfo> entry : fileInfos.entrySet()) { - FileInfo info = entry.getValue(); - info.update(); - if (!info.isCached()) { - LOGGER.info(entry.getKey() + " is not cached with status " + info.getStatus()); - check = false; - break INFO_LOOP; - } - } - if (check) { + + // Check cache status of all files. This will return true if all files are cached. + final boolean allCached = this.checkCacheStatus(); + + // If all cache requests have succeeded then break from loop and set cache status to true. + if (allCached) { cached = true; break; - } - - elapsed = System.currentTimeMillis(); - LOGGER.info("elapsed time: " + elapsed + " ms"); - if (elapsed > maxWaitTime) { + } else { + LOGGER.info(this.getUncachedCount() + " files still uncached"); + } + + // Sleep for awhile before checking the cache statuses again. + // This will return true if max wait time is exceeded and the wait should be stopped. + final boolean waitTimeExceeded = this.sleep(); + + // Break from loop if the max wait time was exceeded. + if (waitTimeExceeded) { break; } - Object lock = new Object(); - synchronized(lock) { - try { - LOGGER.info("waiting " + POLL_WAIT_TIME + " ms before checking again ..."); - lock.wait(POLL_WAIT_TIME); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - } - LOGGER.info("files were cached: " + cached); + } + if (cached) { + LOGGER.info("all files cached successfully!"); + } else { + LOGGER.warning("failed to cache all files!"); + } return cached; } - - private static String readFully(InputStream inputStream, String encoding) throws IOException { - return new String(readFully(inputStream), encoding); - } - - private static byte[] readFully(InputStream inputStream) throws IOException { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - byte[] buffer = new byte[1024]; - int length = 0; - while ((length = inputStream.read(buffer)) != -1) { - baos.write(buffer, 0, length); - } - return baos.toByteArray(); - } } Modified: java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/RunFilter.java ============================================================================= --- java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/RunFilter.java (original) +++ java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/RunFilter.java Wed May 20 14:39:52 2015 @@ -4,13 +4,36 @@ import java.io.FileFilter; import java.util.Set; +/** + * A filter which rejects files that have a run number not in the accept list. + * + * @author Jeremy McCormick + */ final class RunFilter implements FileFilter { - Set<Integer> acceptRuns; + /** + * Set of run numbers to accept. + */ + private final Set<Integer> acceptRuns; + + /** + * Create a new <code>RunFilter</code> with a set of runs to accept. + * + * @param acceptRuns the set of runs to accept + */ RunFilter(final Set<Integer> acceptRuns) { + if (acceptRuns.isEmpty()) { + throw new IllegalArgumentException("the acceptRuns collection is empty"); + } this.acceptRuns = acceptRuns; } + /** + * Returns <code>true</code> if file is accepted (its run number is in the set). + * + * @param file the EVIO file + * @return <code>true</code> if file is accepted + */ @Override public boolean accept(final File file) { return this.acceptRuns.contains(EvioFileUtilities.getRunFromName(file)); Modified: java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/RunLog.java ============================================================================= --- java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/RunLog.java (original) +++ java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/RunLog.java Wed May 20 14:39:52 2015 @@ -14,12 +14,34 @@ import org.hps.conditions.database.ConnectionParameters; import org.lcsim.util.log.LogUtil; +/** + * This class contains summary information about a series of runs that are themselves modeled with the {@link RunSummary} class. These can be looked + * up by their run number. + * <p> + * This class is able to update the run database using the <code>insert</code> methods. + * + * @author Jeremy McCormick + */ final class RunLog { + /** + * Setup logging. + */ private static final Logger LOGGER = LogUtil.create(RunLog.class); - Map<Integer, RunSummary> runs = new HashMap<Integer, RunSummary>(); + /** + * A map between run numbers and the run summary information. + */ + private final Map<Integer, RunSummary> runs = new HashMap<Integer, RunSummary>(); + /** + * Get a run summary by run number. + * <p> + * It will be created if it does not exist. + * + * @param run the run number + * @return the <code>RunSummary</code> for the run number + */ public RunSummary getRunSummary(final int run) { if (!this.runs.containsKey(run)) { LOGGER.info("creating new RunSummary for run " + run); @@ -28,12 +50,22 @@ return this.runs.get(run); } + /** + * Get a list of sorted run numbers in this run log. + * <p> + * This is a copy of the keys from the map so modifying it will have no effect on this class. + * + * @return the list of sorted run numbers + */ List<Integer> getSortedRunNumbers() { final List<Integer> runList = new ArrayList<Integer>(this.runs.keySet()); Collections.sort(runList); return runList; } + /** + * Insert all the information from the run log into the run database. + */ void insert() { LOGGER.info("inserting runs into run_log ..."); @@ -42,9 +74,9 @@ try { connection.setAutoCommit(false); - insertRunLog(connection); + this.insertRunLog(connection); - insertFiles(connection); + this.insertFiles(connection); connection.commit(); @@ -67,24 +99,36 @@ } } - void insertFiles(final Connection connection) throws SQLException { - for (final int run : getSortedRunNumbers()) { - getRunSummary(run).getFiles().insert(connection, run); + /** + * Insert the file lists into the run database. + * + * @param connection the database connection + * @throws SQLException if there is an error executing the SQL query + */ + private void insertFiles(final Connection connection) throws SQLException { + for (final int run : this.getSortedRunNumbers()) { + this.getRunSummary(run).getEvioFileList().insert(connection, run); } } - void insertRunLog(final Connection connection) throws SQLException { + /** + * Insert the run summary information into the database. + * + * @param connection the database connection + * @throws SQLException if there is an error querying the database + */ + private void insertRunLog(final Connection connection) throws SQLException { PreparedStatement runLogStatement = null; runLogStatement = connection .prepareStatement("INSERT INTO run_log (run, start_date, end_date, nevents, nfiles, end_ok, last_updated) VALUES(?, ?, ?, ?, ?, ?, NOW())"); - for (final Integer run : getSortedRunNumbers()) { + for (final Integer run : this.getSortedRunNumbers()) { LOGGER.info("preparing to insert run " + run + " into database .."); final RunSummary runSummary = this.runs.get(run); runLogStatement.setInt(1, run); runLogStatement.setTimestamp(2, new java.sql.Timestamp(runSummary.getStartDate().getTime())); runLogStatement.setTimestamp(3, new java.sql.Timestamp(runSummary.getEndDate().getTime())); runLogStatement.setInt(4, runSummary.getTotalEvents()); - runLogStatement.setInt(5, runSummary.getFiles().size()); + runLogStatement.setInt(5, runSummary.getEvioFileList().size()); runLogStatement.setBoolean(6, runSummary.isEndOkay()); runLogStatement.executeUpdate(); LOGGER.info("committed run " + run + " to run_log"); @@ -92,12 +136,18 @@ LOGGER.info("run_log was updated!"); } + /** + * Print out the run summaries to <code>System.out</code>. + */ void printRunSummaries() { for (final int run : this.runs.keySet()) { this.runs.get(run).printRunSummary(System.out); } } + /** + * Sort all the file lists in place (by sequence number). + */ void sortAllFiles() { for (final Integer run : this.runs.keySet()) { this.runs.get(run).sortFiles(); Modified: java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/RunProcessor.java ============================================================================= --- java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/RunProcessor.java (original) +++ java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/RunProcessor.java Wed May 20 14:39:52 2015 @@ -3,77 +3,286 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Date; import java.util.List; +import java.util.logging.Level; import java.util.logging.Logger; +import org.hps.record.evio.EvioEventConstants; import org.hps.record.evio.EvioEventProcessor; import org.jlab.coda.jevio.EvioEvent; import org.jlab.coda.jevio.EvioException; import org.jlab.coda.jevio.EvioReader; +import org.lcsim.util.log.DefaultLogFormatter; import org.lcsim.util.log.LogUtil; -public final class RunProcessor { - - private static final Logger LOGGER = LogUtil.create(RunProcessor.class); - - List<EvioEventProcessor> processors = new ArrayList<EvioEventProcessor>(); - - RunSummary runSummary; - - RunProcessor(final RunSummary runSummary) { +/** + * Processes all the EVIO files from a run. + * <p> + * This class is a wrapper for activating different sub-tasks, including optionally caching all files from the JLAB MSS to the cache disk using + * jcache. + * <p> + * There is also a list of processors which is run on all events from the run, if the processor list is not empty. + * + * @author Jeremy McCormick + */ +final class RunProcessor { + + /** + * Setup logger. + */ + private static final Logger LOGGER = LogUtil.create(RunProcessor.class, new DefaultLogFormatter(), Level.FINE); + + /** + * The cache manager. + */ + private final JCacheManager cacheManager; + + /** + * The event printing interval when processing EVIO files. + */ + private int eventPrintInterval = 1000; + + /** + * Max files to read (defaults to unlimited). + */ + private int maxFiles = -1; + + /** + * The list of EVIO processors to run on the files that are found. + */ + private final List<EvioEventProcessor> processors = new ArrayList<EvioEventProcessor>(); + + /** + * The run summary information updated by running this processor. + */ + private final RunSummary runSummary; + + /** + * Set to <code>true</code> to use file caching. + */ + private boolean useFileCache; + + /** + * Create a new run processor. + * + * @param runSummary the run summary to update + * @param cacheManager the cache manager for executing 'jcache' commands + */ + RunProcessor(final RunSummary runSummary, final JCacheManager cacheManager) { this.runSummary = runSummary; - } - + this.cacheManager = cacheManager; + } + + /** + * Add a processor of EVIO events. + * + * @param processor the EVIO event processor + */ void addProcessor(final EvioEventProcessor processor) { this.processors.add(processor); - LOGGER.config("added processor: " + processor.getClass().getSimpleName()); - } - + LOGGER.config("added processor " + processor.getClass().getSimpleName()); + } + + /** + * Cache all files and wait for the operation to complete. + * <p> + * Potentially, this operation can take a very long time. This can be managed using the {@link JCacheManager#setWaitTime(long)} method to set a + * timeout. + */ + private void cacheFiles() { + + LOGGER.info("caching files from run " + this.runSummary.getRun() + " ..."); + + // Cache all the files and wait for the operation to complete (it will take awhile!). + this.cacheManager.cache(this.getFiles()); + final boolean cached = this.cacheManager.waitForCache(); + + // If the files weren't cached then die. + if (!cached) { + throw new RuntimeException("The cache process did not complete in time."); + } + + LOGGER.info("done caching files from run " + this.runSummary.getRun()); + } + + Integer computeEventCount(final EvioReader reader) throws IOException, EvioException { + return reader.getEventCount(); + } + + /** + * Get the list of files to process, which will be limited by the {@link #maxFiles} value if it is set. + * + * @return the files to process + */ + private List<File> getFiles() { + // Get the list of files to process, taking into account the max files setting. + List<File> files = this.runSummary.getEvioFileList(); + if (this.maxFiles != -1) { + LOGGER.info("limiting processing to first " + this.maxFiles + " files from max files setting"); + files = files.subList(0, this.maxFiles - 1); + } + return files; + } + + /** + * Get the list of EVIO processors. + * + * @return the list of EVIO processors + */ List<EvioEventProcessor> getProcessors() { return this.processors; } + boolean isEndOkay(final EvioReader reader) throws Exception { + LOGGER.info("checking is END okay ..."); + boolean endOkay = false; + reader.gotoEventNumber(reader.getEventCount() - 2); + EvioEvent event = null; + while ((event = reader.parseNextEvent()) != null) { + if (event.getHeader().getTag() == EvioEventConstants.END_EVENT_TAG) { + endOkay = true; + break; + } + } + return endOkay; + } + + /** + * Process the run. + * + * @throws Exception if there is an error processing a file + */ void process() throws Exception { - + + LOGGER.info("processing run " + this.runSummary.getRun() + " ..."); + + // First cache all the files we will process, if necessary. + if (this.useFileCache) { + this.cacheFiles(); + } + // Run the start of job hooks. for (final EvioEventProcessor processor : this.processors) { processor.startJob(); } - - - - // Process the files. - for (final File file : this.runSummary.getFiles()) { - process(file); - } - + + // Process all the files. + for (final File file : this.getFiles()) { + this.process(file); + } + // Run the end of job hooks. for (final EvioEventProcessor processor : this.processors) { processor.endJob(); } - } - + + LOGGER.info("done processing run " + this.runSummary.getRun()); + } + + /** + * Process a single EVIO file from the run. + * + * @param file the EVIO file + * @throws EvioException if there is an EVIO error + * @throws IOException if there is some kind of IO error + * @throws Exception if there is a generic error thrown by event processing + */ + // FIXME: I think this method is terribly inefficient right now. private void process(final File file) throws EvioException, IOException, Exception { + LOGGER.fine("processing " + file.getPath() + " ..."); + EvioReader reader = null; try { // Open with wrapper method which will use the cached file path if necessary. + LOGGER.fine("opening " + file.getPath() + " for reading ..."); reader = EvioFileUtilities.open(file); - - // Compute event count for the file and store the value. - this.runSummary.getFiles().computeEventCount(reader, file); - + LOGGER.fine("done opening " + file.getPath()); + + // If this is the first file then get the start date. + if (file.equals(this.runSummary.getEvioFileList().first())) { + LOGGER.fine("getting run start ..."); + final Date runStart = EvioFileUtilities.getRunStart(file); + LOGGER.fine("got run start " + runStart); + this.runSummary.setStartDate(runStart); + } + + // Compute event count for the file and store the value in the run summary's file list. + LOGGER.info("getting event count for " + file.getPath() + "..."); + final int eventCount = this.computeEventCount(reader); + this.runSummary.getEvioFileList().setEventCount(file, eventCount); + LOGGER.info("set event count " + eventCount + " for " + file.getPath()); + // Process the events using the list of EVIO processors. - EvioEvent event = null; - while ((event = reader.parseNextEvent()) != null) { - for (final EvioEventProcessor processor : this.processors) { - processor.process(event); + LOGGER.info("running EVIO processors ..."); + reader.gotoEventNumber(0); + int nProcessed = 0; + if (!this.processors.isEmpty()) { + EvioEvent event = null; + while ((event = reader.parseNextEvent()) != null) { + for (final EvioEventProcessor processor : this.processors) { + processor.process(event); + ++nProcessed; + if (nProcessed % this.eventPrintInterval == 0) { + LOGGER.finer("processed " + nProcessed + " EVIO events"); + } + } } } + LOGGER.info("done running EVIO processors"); + + // Check if END event is present if this is the last file in the run. + if (file.equals(this.runSummary.getEvioFileList().last())) { + LOGGER.info("checking end okay ..."); + final boolean endOkay = this.isEndOkay(reader); + this.runSummary.setEndOkay(endOkay); + LOGGER.info("endOkay set to " + endOkay); + + LOGGER.info("getting end date ..."); + final Date endDate = EvioFileUtilities.getRunEnd(file); + this.runSummary.setEndDate(endDate); + LOGGER.info("found end date " + endDate); + } + } finally { if (reader != null) { reader.close(); } } - } - + LOGGER.fine("done processing " + file.getPath()); + } + + /** + * Set the event print interval when running the EVIO processors. + * + * @param eventPrintInterval the event print interval when running the EVIO processors + */ + void setEventPrintInterval(final int eventPrintInterval) { + this.eventPrintInterval = eventPrintInterval; + } + + /** + * Set the maximum number of files to process. + * <p> + * This is primarily used for debugging purposes. + * + * @param maxFiles the maximum number of files to process + */ + void setMaxFiles(final int maxFiles) { + this.maxFiles = maxFiles; + LOGGER.config("max files set to " + maxFiles); + } + + /** + * Set whether or not to use the file caching, which copies files from the JLAB MSS to the cache disk. + * <p> + * Since EVIO data files at JLAB are primarily kept on the MSS, running without this option enabled there will likely cause the job to fail. + * + * @param cacheFiles <code>true</code> to enabled file caching + */ + void useFileCache(final boolean cacheFiles) { + this.useFileCache = cacheFiles; + LOGGER.config("file caching enabled"); + } + } Modified: java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/RunSummary.java ============================================================================= --- java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/RunSummary.java (original) +++ java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/RunSummary.java Wed May 20 14:39:52 2015 @@ -1,17 +1,12 @@ package org.hps.record.evio.crawler; import java.io.File; -import java.io.IOException; import java.io.PrintStream; import java.util.Date; import java.util.Map; import java.util.logging.Logger; import org.hps.record.epics.EpicsData; -import org.hps.record.evio.EvioEventConstants; -import org.jlab.coda.jevio.EvioEvent; -import org.jlab.coda.jevio.EvioException; -import org.jlab.coda.jevio.EvioReader; import org.lcsim.util.log.LogUtil; /** @@ -31,21 +26,65 @@ */ final class RunSummary { + /** + * Setup logger. + */ private static final Logger LOGGER = LogUtil.create(RunSummary.class); + /** + * The end date of the run. + */ private Date endDate; + + /** + * This is <code>true</code> if the END event is found in the data. + */ + private boolean endOkay; + + /** + * The combined EPICS information for the run (uses the mean values for each variable). + */ private EpicsData epics; + + /** + * The counts of different types of events that were found. + */ private Map<Object, Integer> eventTypeCounts; + + /** + * The list of EVIO files in the run. + */ private final EvioFileList files = new EvioFileList(); - private Boolean isEndOkay; + + /** + * The run number. + */ private final int run; + + /** + * The start date of the run. + */ private Date startDate; + + /** + * The total events found in the run across all files. + */ private int totalEvents = -1; + /** + * Create a run summary. + * + * @param run the run number + */ RunSummary(final int run) { this.run = run; } + /** + * Add an EVIO file from this run to the list. + * + * @param file the file to add + */ void addFile(final File file) { this.files.add(file); @@ -53,77 +92,97 @@ this.totalEvents = -1; } + /** + * Get the date when the run ended. + * <p> + * This will be extracted from the EVIO END event. If there is no END record it will be the last event time. + * + * @return the date when the run ended + */ Date getEndDate() { - if (this.endDate == null) { - this.endDate = EvioFileUtilities.getRunEnd(this.files.last()); - } return this.endDate; } + /** + * Get the EPICS data summary. + * <p> + * This is computed by taking the mean of each variable for the run. + * + * @return the EPICS data summary + */ EpicsData getEpicsData() { return this.epics; } + /** + * Get the counts of different event types. + * + * @return the counts of different event types + */ Map<Object, Integer> getEventTypeCounts() { return this.eventTypeCounts; } - EvioFileList getFiles() { + /** + * Get the list of EVIO files in this run. + * + * @return the list of EVIO files in this run + */ + EvioFileList getEvioFileList() { return this.files; } + /** + * Get the run number. + * + * @return the run number + */ + int getRun() { + return this.run; + } + + /** + * Get the start date of the run. + * + * @return the start date of the run + */ Date getStartDate() { - if (this.startDate == null) { - this.startDate = EvioFileUtilities.getRunStart(this.files.first()); - } return this.startDate; } + /** + * Get the total events in the run. + * + * @return the total events in the run + */ int getTotalEvents() { if (this.totalEvents == -1) { - this.totalEvents = this.files.computeTotalEvents(); + this.totalEvents = this.files.getTotalEvents(); } return this.totalEvents; } + /** + * Return <code>true</code> if END event was found in the data. + * + * @return <code>true</code> if END event was in the data + */ boolean isEndOkay() { - if (this.isEndOkay == null) { - LOGGER.info("checking is END okay ..."); - this.isEndOkay = false; - final File lastFile = this.files.last(); - EvioReader reader = null; - try { - reader = EvioFileUtilities.open(lastFile); - reader.gotoEventNumber(reader.getEventCount() - 5); - EvioEvent event = null; - while ((event = reader.parseNextEvent()) != null) { - if (event.getHeader().getTag() == EvioEventConstants.END_EVENT_TAG) { - this.isEndOkay = true; - break; - } - } - } catch (EvioException | IOException e) { - throw new RuntimeException(e); - } finally { - if (reader != null) { - try { - reader.close(); - } catch (final IOException e) { - e.printStackTrace(); - } - } - } - } - return this.isEndOkay; - } - + return this.endOkay; + } + + /** + * Print the run summary. + * + * @param ps the print stream for output + */ void printRunSummary(final PrintStream ps) { ps.println("--------------------------------------------"); ps.println("run: " + this.run); ps.println("first file: " + this.files.first()); ps.println("last file: " + this.files.last()); - ps.println("started: " + getStartDate()); - ps.println("ended: " + getEndDate()); + ps.println("started: " + this.getStartDate()); + ps.println("ended: " + this.getEndDate()); ps.println("total events: " + this.getTotalEvents()); ps.println("event types"); for (final Object key : this.eventTypeCounts.keySet()) { @@ -135,14 +194,54 @@ } } + /** + * Set the end date. + * + * @param endDate the end date + */ + void setEndDate(final Date endDate) { + this.endDate = endDate; + } + + /** + * Set if end is okay. + * + * @param endOkay <code>true</code> if end is okay + */ + void setEndOkay(final boolean endOkay) { + this.endOkay = endOkay; + } + + /** + * Set the EPICS data for the run. + * + * @param epics the EPICS data for the run + */ void setEpicsData(final EpicsData epics) { this.epics = epics; } + /** + * Set the event type counts for the run. + * + * @param eventTypeCounts the event type counts for the run + */ void setEventTypeCounts(final Map<Object, Integer> eventTypeCounts) { this.eventTypeCounts = eventTypeCounts; } + /** + * Set the start date of the run. + * + * @param startDate the start date of the run + */ + void setStartDate(final Date startDate) { + this.startDate = startDate; + } + + /** + * Sort the files in the run by sequence number in place. + */ void sortFiles() { this.files.sort(); } Added: java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/package-info.java ============================================================================= --- java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/package-info.java (added) +++ java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/package-info.java Wed May 20 14:39:52 2015 @@ -0,0 +1,6 @@ +/** + * Implements an EVIO file crawler for extracting run and configuration information, including run start and end dates, event counts, etc. + * + * @author Jeremy McCormick + */ +package org.hps.record.evio.crawler;