Author: [log in to unmask] Date: Tue Sep 1 19:04:51 2015 New Revision: 3487 Log: Rework extraction of EVIO metadata so it is included with the event processing via EvioLoop. Added: java/trunk/crawler/src/main/java/org/hps/crawler/EvioDatacatUtilities.java java/trunk/record-util/src/main/java/org/hps/record/evio/EvioFileMetadata.java - copied, changed from r3467, java/trunk/record-util/src/main/java/org/hps/record/evio/EvioFileMetaData.java java/trunk/record-util/src/main/java/org/hps/record/evio/EvioFileMetadataAdapter.java java/trunk/record-util/src/main/java/org/hps/record/evio/EvioFileMetadataProcessor.java Removed: java/trunk/record-util/src/main/java/org/hps/record/evio/EvioFileMetaData.java Modified: java/trunk/crawler/src/main/java/org/hps/crawler/Crawler.java java/trunk/crawler/src/main/java/org/hps/crawler/CrawlerConfig.java java/trunk/crawler/src/main/java/org/hps/crawler/RunProcessor.java Modified: java/trunk/crawler/src/main/java/org/hps/crawler/Crawler.java ============================================================================= --- java/trunk/crawler/src/main/java/org/hps/crawler/Crawler.java (original) +++ java/trunk/crawler/src/main/java/org/hps/crawler/Crawler.java Tue Sep 1 19:04:51 2015 @@ -7,10 +7,10 @@ import java.nio.file.attribute.BasicFileAttributes; import java.sql.Connection; import java.sql.SQLException; -import java.util.ArrayList; import java.util.Date; import java.util.EnumSet; import java.util.HashSet; +import java.util.List; import java.util.Set; import java.util.logging.Level; import java.util.logging.Logger; @@ -21,8 +21,10 @@ import org.apache.commons.cli.ParseException; import org.apache.commons.cli.PosixParser; import org.hps.conditions.database.ConnectionParameters; +import org.hps.record.evio.EvioFileMetadata; import org.hps.run.database.RunDatabaseDaoFactory; import org.hps.run.database.RunSummary; +import org.hps.run.database.RunSummaryDao; import org.hps.run.database.RunSummaryImpl; import org.lcsim.util.log.DefaultLogFormatter; import org.lcsim.util.log.LogUtil; @@ -55,18 +57,19 @@ */ static { OPTIONS.addOption("b", "min-date", true, "min date for a file (example \"2015-03-26 11:28:59\")"); - OPTIONS.addOption("C", "cache", false, "automatically cache files from MSS to cache disk (JLAB only)"); + OPTIONS.addOption("c", "datacat", true, "update the data catalog using the specified folder (off by default)"); + OPTIONS.addOption("C", "cache", false, "cache files from MSS (JLAB only and not for batch farm use!)"); OPTIONS.addOption("p", "connection-properties", true, "database connection properties file (required)"); OPTIONS.addOption("d", "directory", true, "root directory to start crawling (default is current dir)"); OPTIONS.addOption("E", "evio-processor", true, "class name of an EvioEventProcessor to execute"); OPTIONS.addOption("h", "help", false, "print help and exit (overrides all other arguments)"); OPTIONS.addOption("i", "insert", false, "insert information into the run database (not done by default)"); OPTIONS.addOption("L", "log-level", true, "set the log level (INFO, FINE, etc.)"); - OPTIONS.addOption("r", "run", true, "add a run number to accept (when used others will be excluded)"); + OPTIONS.addOption("r", "run", true, "add a run number to accept (others will be excluded)"); OPTIONS.addOption("t", "timestamp-file", true, "existing or new timestamp file name"); - OPTIONS.addOption("w", "max-cache-wait", true, "total time to allow for file caching (seconds)"); + OPTIONS.addOption("w", "max-cache-wait", true, "time per run allowed for file caching in seconds"); OPTIONS.addOption("u", "update", false, "allow replacement of existing data in the run db (off by default)"); - OPTIONS.addOption("x", "max-depth", true, "max depth to crawl in the directory tree"); + OPTIONS.addOption("x", "max-depth", true, "max depth to crawl"); } /** @@ -88,26 +91,21 @@ * @param runs the run log containing the list of run summaries * @throws Exception if there is an error processing one of the runs */ - static void processRuns(final JCacheManager cacheManager, final RunSummaryMap runs, final boolean useFileCache) - throws Exception { - - // Process all of the runs that were found. - for (final RunSummary runSummary : runs.getRunSummaries()) { - - // Clear the cache manager. - if (useFileCache) { - LOGGER.info("clearing file cache"); - cacheManager.clear(); - } - - // Create a processor to process all the EVIO events in the run. - LOGGER.info("creating run processor for " + runSummary.getRun()); - final RunProcessor runProcessor = new RunProcessor(cacheManager, (RunSummaryImpl) runSummary, useFileCache); - - // Process all of the files from the run. - LOGGER.info("processing run " + runSummary.getRun()); - runProcessor.processRun(); - } + static RunProcessor processRun(final RunSummary runSummary) throws Exception { + + LOGGER.info("processing run" + runSummary.getRun()); + + // Create a processor to process all the EVIO events in the run. + LOGGER.info("creating run processor for " + runSummary.getRun()); + final RunProcessor runProcessor = new RunProcessor((RunSummaryImpl) runSummary); + + // Process all of the files from the run. + LOGGER.info("processing run " + runSummary.getRun()); + runProcessor.processRun(); + + LOGGER.getHandlers()[0].flush(); + + return runProcessor; } /** @@ -124,6 +122,35 @@ * The options parser. */ private final PosixParser parser = new PosixParser(); + + /** + * Cache all files and wait for the operation to complete. + * <p> + * Potentially, this operation can take a very long time. This can be managed using the + * {@link JCacheManager#setWaitTime(long)} method to set a timeout. + */ + private void cacheFiles(final RunSummary runSummary) { + if (this.config.useFileCache()) { + + LOGGER.info("caching files for run " + runSummary.getRun()); + + // Cache all the files and wait for the operation to complete. + this.cacheManager.cache(runSummary.getEvioFiles()); + final boolean cached = this.cacheManager.waitForCache(); + + // If the files weren't cached then die. + if (!cached) { + throw new RuntimeException("The cache process did not complete in time."); + } + + LOGGER.info("done caching files from run " + runSummary.getRun()); + + } else { + LOGGER.info("file caching disabled"); + } + + LOGGER.getHandlers()[0].flush(); + } /** * Parse command line options and create a new {@link Crawler} object from the configuration. @@ -263,6 +290,7 @@ final String[] classNames = cl.getOptionValues("E"); for (final String className : classNames) { try { + LOGGER.config("adding extra EVIO processor " + className); config.addProcessor(className); } catch (final Exception e) { throw new RuntimeException(e); @@ -277,6 +305,18 @@ throw new IllegalArgumentException("invalid -x argument for maxDepth: " + maxDepth); } config.setMaxDepth(maxDepth); + LOGGER.config("set max depth to " + maxDepth); + } + + // Update data catalog. + if (cl.hasOption("c")) { + final String datacatFolder = cl.getOptionValue("c"); + if (datacatFolder == null) { + throw new IllegalArgumentException("missing -c argument with data catalog folder"); + } + LOGGER.config("using data catalog folder " + datacatFolder); + config.setDatacatFolder(datacatFolder); + config.setUpdateDatacat(true); } } catch (final ParseException e) { @@ -310,37 +350,59 @@ */ public void run() throws Exception { - LOGGER.info("running Crawler job"); + LOGGER.info("starting Crawler job"); // Create the file visitor for crawling the root directory with the given date filter. - LOGGER.info("creating file visitor"); - LOGGER.getHandlers()[0].flush(); final EvioFileVisitor visitor = new EvioFileVisitor(config.timestamp()); // Walk the file tree using the visitor. - LOGGER.info("walking the dir tree"); - LOGGER.getHandlers()[0].flush(); this.walk(visitor); // Get the list of run data created by the visitor. - final RunSummaryMap runs = visitor.getRunMap(); - - // Process all the files, performing caching from the MSS if necessary. - LOGGER.info("processing all runs"); - processRuns(this.cacheManager, runs, config.useFileCache()); - LOGGER.getHandlers()[0].flush(); - - // Execute the run database update. - LOGGER.info("updating run database"); - this.updateRunDatabase(runs); - LOGGER.getHandlers()[0].flush(); + final RunSummaryMap runMap = visitor.getRunMap(); + + // Process all runs that were found. + for (RunSummary runSummary : runMap.getRunSummaries()) { + + if (runSummary == null) { + throw new IllegalArgumentException("The run summary is null for some weird reason."); + } + + LOGGER.info("starting full processing of run " + runSummary.getRun()); + + // Cache files from MSS. + this.cacheFiles(runSummary); + + // Process the run's files. + RunProcessor runProcessor = processRun(runSummary); + + // Execute the run database update. + this.updateRunDatabase(runSummary); + + // Update the data catalog. + this.updateDatacat(runProcessor.getEvioFileMetaData()); + + LOGGER.info("completed full processing of run " + runSummary); + } // Update the timestamp output file. - LOGGER.info("updating the timestamp"); this.updateTimestamp(); - LOGGER.getHandlers()[0].flush(); LOGGER.info("Crawler job is done!"); + } + + /** + * Update the data catalog. + * + * @param runMap the map of run information including the EVIO file list + */ + private void updateDatacat(List<EvioFileMetadata> metadataList) { + if (this.config.updateDatacat()) { + EvioDatacatUtilities.addEvioFiles(metadataList, config.datacatFolder()); + LOGGER.info("done updating data catalog"); + } else { + LOGGER.info("updating data catalog is disabled"); + } } /** @@ -349,20 +411,33 @@ * @param runs the list of runs to update * @throws SQLException if there is a database query error */ - private void updateRunDatabase(final RunSummaryMap runs) throws SQLException { + private void updateRunDatabase(final RunSummary runSummary) throws SQLException { // Insert the run information into the database. if (config.updateRunDatabase()) { - LOGGER.info("updating run database is enabled"); + LOGGER.info("updating run database for run " + runSummary.getRun()); // Open a DB connection. final Connection connection = config.connectionParameters().createConnection(); + // Create factory for interfacing to run database. final RunDatabaseDaoFactory dbFactory = new RunDatabaseDaoFactory(connection); - // Insert all run summaries into the database. - dbFactory.createRunSummaryDao().insertFullRunSummaries(new ArrayList<RunSummary>(runs.getRunSummaries()), - config.allowUpdates()); + // Create object for updating run info in the database. + final RunSummaryDao runSummaryDao = dbFactory.createRunSummaryDao(); + + // Delete existing run summary if necessary. + if (runSummaryDao.runSummaryExists(runSummary.getRun())) { + if (this.config.allowUpdates()) { + LOGGER.info("deleting existing information for run " + runSummary.getRun()); + runSummaryDao.deleteFullRunSummary(runSummary); + } else { + throw new RuntimeException("Run " + runSummary.getRun() + " exists in database and deletion is not enabled."); + } + } + + // Insert run summary into database. + runSummaryDao.insertFullRunSummary(runSummary); // Close the DB connection. connection.close(); @@ -370,28 +445,32 @@ LOGGER.info("done updating run database"); } else { - LOGGER.info("updating run database is not enabled"); - } + LOGGER.info("updating run database is disabled"); + } + + LOGGER.getHandlers()[0].flush(); } /** * Update the timestamp file's modification date to the time when this job ended. * <p> - * This can be then be used in subsequent crawl jobs to filter out files that have already been seen. + * This can be then be used in subsequent crawl jobs to filter out files that have already been processed. */ private void updateTimestamp() { - // Update the timestamp file which can be used to tell which files have been processed by their creation date. if (config.timestampFile() == null) { + + LOGGER.info("creating default timestamp file"); + config.setTimestampFile(new File("timestamp")); try { config.timestampFile().createNewFile(); } catch (final IOException e) { throw new RuntimeException(e); } - LOGGER.config("created new timestamp file: " + config.timestampFile().getPath()); } config.timestampFile().setLastModified(System.currentTimeMillis()); - LOGGER.config("set modified on timestamp file: " + new Date(config.timestampFile().lastModified())); + LOGGER.info("touching timestamp file " + config.timestampFile().getPath()); + LOGGER.info("set modified on timestamp file: " + new Date(config.timestampFile().lastModified())); } /** Modified: java/trunk/crawler/src/main/java/org/hps/crawler/CrawlerConfig.java ============================================================================= --- java/trunk/crawler/src/main/java/org/hps/crawler/CrawlerConfig.java (original) +++ java/trunk/crawler/src/main/java/org/hps/crawler/CrawlerConfig.java Tue Sep 1 19:04:51 2015 @@ -43,6 +43,13 @@ private ConnectionParameters connectionParameters; /** + * The name of the folder in the data catalog for inserting data (under "/HPS" root folder). + * <p> + * Default provided for Eng Run 2015 data. + */ + private String datacatFolder = null; + + /** * The maximum depth to crawl. */ private Integer maxDepth = Integer.MAX_VALUE; @@ -71,6 +78,11 @@ * A file to use for getting the timestamp date. */ private File timestampFile = null; + + /** + * <code>true</code> if the data catalog should be updated (off by default). + */ + private boolean updateDatacat = false; /** * <code>true</code> if the run database should be updated from results of the job. @@ -141,6 +153,15 @@ } /** + * Get the data catalog folder. + * + * @return the data catalog folder + */ + String datacatFolder() { + return this.datacatFolder; + } + + /** * Get the max depth in the directory tree to crawl. * * @return the max depth @@ -212,6 +233,16 @@ } /** + * Set the data catalog folder. + * + * @param datacatFolder the data catalog folder + */ + CrawlerConfig setDatacatFolder(final String datacatFolder) { + this.datacatFolder = datacatFolder; + return this; + } + + /** * Set the max depth. * * @param maxDepth the max depth @@ -284,6 +315,16 @@ } /** + * Set to <code>true</code> to update data catalog. + * + * @param updateDatacat <code>true</code> to update data catalog + */ + CrawlerConfig setUpdateDatacat(final boolean updateDatacat) { + this.updateDatacat = updateDatacat; + return this; + } + + /** * Set whether the run database should be updated in the job. * <p> * This will not allow replacement of existing run log records. The {@link #allowUpdates()} flag must be on for this @@ -344,6 +385,15 @@ } /** + * Get whether data catalog should be updated or not. + * + * @return <code>true</code> if data catalog should be update + */ + boolean updateDatacat() { + return this.updateDatacat; + } + + /** * Return <code>true</code> if the run database should be updated. * * @return <code>true</code> if the run database should be updated Added: java/trunk/crawler/src/main/java/org/hps/crawler/EvioDatacatUtilities.java ============================================================================= --- java/trunk/crawler/src/main/java/org/hps/crawler/EvioDatacatUtilities.java (added) +++ java/trunk/crawler/src/main/java/org/hps/crawler/EvioDatacatUtilities.java Tue Sep 1 19:04:51 2015 @@ -0,0 +1,100 @@ +package org.hps.crawler; + +import java.io.File; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.hps.datacat.client.DatacatClient; +import org.hps.datacat.client.DatacatClientFactory; +import org.hps.datacat.client.DatasetDataType; +import org.hps.datacat.client.DatasetFileFormat; +import org.hps.datacat.client.DatasetSite; +import org.hps.record.evio.EvioFileMetadata; +import org.lcsim.util.log.DefaultLogFormatter; +import org.lcsim.util.log.LogUtil; + +/** + * Utility for inserting EVIO files into the SRS data catalog. + * + * @author Jeremy McCormick, SLAC + */ +final class EvioDatacatUtilities { + + /** + * Setup logger. + */ + private static final Logger LOGGER = LogUtil.create(EvioDatacatUtilities.class, new DefaultLogFormatter(), Level.FINE); + + /** + * Add an EVIO to the data catalog. + * + * @param client the data catalog client + * @param folder the folder name e.g. "data/raw" + * @param evioFile the evio file to add to the data catalog + * @param evioMetadata the EVIO file's meta data + * @return the HTTP response code + */ + static int addEvioFile(final DatacatClient client, final String folder, final EvioFileMetadata evioMetadata) { + + // Create metadata map for adding dataset. + final Map<String, Object> metadataMap = createMetadataMap(evioMetadata); + + // Get the EVIO file. + File evioFile = evioMetadata.getEvioFile(); + + // Add the dataset to the data catalog using the REST API. + final int response = client.addDataset(folder, DatasetDataType.RAW, evioFile.getPath(), DatasetSite.SLAC, + DatasetFileFormat.EVIO, evioFile.getName(), metadataMap); + + return response; + } + + /** + * Add a list of EVIO files to the data catalog. + * + * @param evioFiles the list of EVIO files + * @param folder the folder in the data catalog + */ + static void addEvioFiles(List<EvioFileMetadata> metadataList, final String folder) { + LOGGER.info("adding " + metadataList.size() + " EVIO files to data catalog in folder " + folder); + final DatacatClientFactory datacatFactory = new DatacatClientFactory(); + final DatacatClient datacatClient = datacatFactory.createClient(); + for (EvioFileMetadata metadata : metadataList) { + LOGGER.info("adding " + metadata.getEvioFile().getPath() + " to data catalog"); + EvioDatacatUtilities.addEvioFile(datacatClient, folder, metadata); + } + } + + /** + * Create a map of metadata keys and values suitable for making a new dataset. + * + * @param evioMetadata the EVIO metadata object + * @return the metadata map + */ + static Map<String, Object> createMetadataMap(EvioFileMetadata evioMetadata) { + final Map<String, Object> metadataMap = new HashMap<String, Object>(); + metadataMap.put("runMin", evioMetadata.getRun()); + metadataMap.put("runMax", evioMetadata.getRun()); + metadataMap.put("eventCount", evioMetadata.getEventCount()); + metadataMap.put("size", evioMetadata.getByteCount()); + metadataMap.put("fileNumber", evioMetadata.getSequence()); + metadataMap.put("badEventCount", evioMetadata.getBadEventCount()); + metadataMap.put("endTimestamp", evioMetadata.getEndDate().getTime()); + metadataMap.put("startTimestamp", evioMetadata.getStartDate().getTime()); + metadataMap.put("startEvent", evioMetadata.getStartEvent()); + metadataMap.put("endEvent", evioMetadata.getEndEvent()); + metadataMap.put("hasEnd", evioMetadata.hasEnd() ? 1 : 0); + metadataMap.put("hasPrestart", evioMetadata.hasPrestart() ? 1 : 0); + return metadataMap; + } + + /** + * Class constructor which is private. + */ + private EvioDatacatUtilities() { + throw new RuntimeException("Do not instantiate this class."); + } +} Modified: java/trunk/crawler/src/main/java/org/hps/crawler/RunProcessor.java ============================================================================= --- java/trunk/crawler/src/main/java/org/hps/crawler/RunProcessor.java (original) +++ java/trunk/crawler/src/main/java/org/hps/crawler/RunProcessor.java Tue Sep 1 19:04:51 2015 @@ -1,13 +1,13 @@ package org.hps.crawler; -import java.io.File; import java.util.Collections; +import java.util.List; import java.util.logging.Level; import java.util.logging.Logger; import org.hps.record.epics.EpicsRunProcessor; -import org.hps.record.evio.EvioFileMetaData; -import org.hps.record.evio.EvioFileMetaDataReader; +import org.hps.record.evio.EvioFileMetadata; +import org.hps.record.evio.EvioFileMetadataAdapter; import org.hps.record.evio.EvioFileSequenceComparator; import org.hps.record.evio.EvioFileSource; import org.hps.record.evio.EvioLoop; @@ -37,11 +37,6 @@ private static final Logger LOGGER = LogUtil.create(RunProcessor.class, new DefaultLogFormatter(), Level.FINE); /** - * The cache manager. - */ - private final JCacheManager cacheManager; - - /** * Processor for extracting EPICS information. */ private final EpicsRunProcessor epicsProcessor; @@ -70,11 +65,11 @@ * Processor for extracting TI time offset. */ private final TiTimeOffsetEvioProcessor triggerTimeProcessor; - - /** - * Set to <code>true</code> to use file caching. - */ - private final boolean useFileCache; + + /** + * Record loop adapter for getting file metadata. + */ + private final EvioFileMetadataAdapter metadataAdapter = new EvioFileMetadataAdapter(); /** * Create a run processor. @@ -82,13 +77,9 @@ * @param runSummary the run summary object for the run * @return the run processor */ - RunProcessor(final JCacheManager cacheManager, final RunSummaryImpl runSummary, final boolean useFileCache) { + RunProcessor(final RunSummaryImpl runSummary) { this.runSummary = runSummary; - this.cacheManager = cacheManager; - - // Set whether file caching from MSS is enabled. - this.useFileCache = useFileCache; // Sort the list of EVIO files. Collections.sort(runSummary.getEvioFiles(), new EvioFileSequenceComparator()); @@ -109,63 +100,46 @@ // Add processor for extracting TI time offset. triggerTimeProcessor = new TiTimeOffsetEvioProcessor(); evioLoop.addEvioEventProcessor(triggerTimeProcessor); - } - - /** - * Cache all files and wait for the operation to complete. - * <p> - * Potentially, this operation can take a very long time. This can be managed using the - * {@link JCacheManager#setWaitTime(long)} method to set a timeout. - */ - private void cacheFiles() { - - LOGGER.info("caching files from run " + this.runSummary.getRun()); - - // Cache all the files and wait for the operation to complete. - this.cacheManager.cache(this.runSummary.getEvioFiles()); - final boolean cached = this.cacheManager.waitForCache(); - - // If the files weren't cached then die. - if (!cached) { - throw new RuntimeException("The cache process did not complete in time."); - } - - LOGGER.info("done caching files from run " + this.runSummary.getRun()); + + // Add file metadata processor. + evioLoop.addRecordListener(metadataAdapter); + evioLoop.addLoopListener(metadataAdapter); } /** * Extract meta data from first file in run. */ - private void processFirstFile() { - final File firstEvioFile = runSummary.getEvioFiles().get(0); - LOGGER.info("getting meta data for " + firstEvioFile.getPath()); - final EvioFileMetaDataReader metaDataReader = new EvioFileMetaDataReader(); - final EvioFileMetaData metaData = metaDataReader.getMetaData(firstEvioFile); - LOGGER.info(metaData.toString()); - if (metaData.getStartDate() == null) { - throw new IllegalStateException("The start date is not set in the EVIO file meta data from " - + firstEvioFile.getPath()); - } - LOGGER.info("setting unix start time to " + metaData.getStartDate().getTime() + " from meta data"); - runSummary.setStartDate(metaData.getStartDate()); + private void processFirstFile() { + final EvioFileMetadata metadata = metadataAdapter.getEvioFileMetadata().get(0); + if (metadata == null) { + throw new IllegalStateException("No meta data exists for first file."); + } + LOGGER.info("first file metadata: " + metadata.toString()); + if (metadata.getStartDate() == null) { + throw new IllegalStateException("The start date is not set in the metadata."); + } + LOGGER.info("setting unix start time to " + metadata.getStartDate().getTime() + " from meta data"); + runSummary.setStartDate(metadata.getStartDate()); } /** * Extract meta data from last file in run. */ private void processLastFile() { - final File lastEvioFile = runSummary.getEvioFiles().get(runSummary.getEvioFiles().size() - 1); - LOGGER.info("getting meta data for " + lastEvioFile.getPath()); - final EvioFileMetaDataReader metaDataReader = new EvioFileMetaDataReader(); - final EvioFileMetaData metaData = metaDataReader.getMetaData(lastEvioFile); - LOGGER.info(metaData.toString()); - if (metaData.getEndDate() == null) { - throw new IllegalStateException("The end date is not set in the EVIO file meta data from " - + lastEvioFile.getPath()); - } - LOGGER.info("setting unix end time to " + metaData.getEndDate().getTime() + " from meta data"); - runSummary.setEndDate(metaData.getEndDate()); - runSummary.setEndOkay(metaData.hasEnd()); + LOGGER.info("looking for " + runSummary.getEvioFiles().get(runSummary.getEvioFiles().size() - 1).getPath() + " metadata"); + LOGGER.getHandlers()[0].flush(); + final EvioFileMetadata metadata = this.metadataAdapter.getEvioFileMetadata().get(this.metadataAdapter.getEvioFileMetadata().size() - 1); + if (metadata == null) { + throw new IllegalStateException("Failed to find metadata for last file."); + } + LOGGER.info("last file metadata: " + metadata.toString()); + if (metadata.getEndDate() == null) { + throw new IllegalStateException("The end date is not set in the metadata."); + } + LOGGER.info("setting unix end time to " + metadata.getEndDate().getTime() + " from meta data"); + runSummary.setEndDate(metadata.getEndDate()); + LOGGER.info("setting has END to " + metadata.hasEnd()); + runSummary.setEndOkay(metadata.hasEnd()); } /** @@ -181,21 +155,18 @@ LOGGER.info("processing " + this.runSummary.getEvioFiles().size() + " files from run " + this.runSummary.getRun()); - // Cache files from MSS if this is enabled. - if (this.useFileCache) { - LOGGER.info("caching files from MSS"); - this.cacheFiles(); - } - // Run processors over all files. LOGGER.info("looping over all events"); evioLoop.loop(-1); - - // Get run start date. + + LOGGER.info("got " + metadataAdapter.getEvioFileMetadata().size() + " metadata objects from loop"); + LOGGER.getHandlers()[0].flush(); + + // Set start date from first file. LOGGER.info("processing first file"); this.processFirstFile(); - // Get run end date. + // Set end date from last file. LOGGER.info("processing last file"); this.processLastFile(); @@ -203,7 +174,7 @@ LOGGER.info("updating run summary"); this.updateRunSummary(); - LOGGER.info("done processing run " + this.runSummary.getRun()); + LOGGER.info("run processor done with run " + this.runSummary.getRun()); } /** @@ -211,8 +182,8 @@ */ private void updateRunSummary() { + // Set total number of events from the event loop. LOGGER.info("setting total events " + evioLoop.getTotalCountableConsumed()); - // Set total number of events from the event loop. runSummary.setTotalEvents((int) evioLoop.getTotalCountableConsumed()); // Add scaler data from the scalers EVIO processor. @@ -231,5 +202,14 @@ runSummary.setTriggerConfigInt(triggerConfig); LOGGER.getHandlers()[0].flush(); + } + + /** + * Get list of metadata created by processing the files. + * + * @return the list of metadata + */ + List<EvioFileMetadata> getEvioFileMetaData() { + return this.metadataAdapter.getEvioFileMetadata(); } } Copied: java/trunk/record-util/src/main/java/org/hps/record/evio/EvioFileMetadata.java (from r3467, java/trunk/record-util/src/main/java/org/hps/record/evio/EvioFileMetaData.java) ============================================================================= --- java/trunk/record-util/src/main/java/org/hps/record/evio/EvioFileMetaData.java (original) +++ java/trunk/record-util/src/main/java/org/hps/record/evio/EvioFileMetadata.java Tue Sep 1 19:04:51 2015 @@ -2,15 +2,13 @@ import java.io.File; import java.util.Date; -import java.util.HashMap; -import java.util.Map; /** * Meta data that can be extracted from EVIO files. * * @author Jeremy McCormick, SLAC */ -public final class EvioFileMetaData { +public final class EvioFileMetadata { /** * The number of bad events in the file that are unreadable. @@ -77,15 +75,17 @@ * * @param evioFile the EVIO file to which the meta data applies */ - public EvioFileMetaData(final File evioFile) { + public EvioFileMetadata(final File evioFile) { if (evioFile == null) { throw new IllegalArgumentException("The EVIO file argument is null."); } if (!evioFile.exists()) { - throw new IllegalArgumentException("The file " + evioFile.getPath() - + " does not exist or it is inaccessible."); + throw new IllegalArgumentException("The file " + evioFile.getPath() + " does not exist."); } this.evioFile = evioFile; + + // Set sequence number. + setSequence(EvioFileUtilities.getSequenceFromName(this.evioFile)); } /** @@ -314,28 +314,4 @@ + this.hasEnd + ", run: " + this.run + ", fileNumber: " + sequence + ", startEvent: " + this.startEvent + ", endEvent: " + endEvent + " }"; } - - /** - * Convert data to a map with names and values (int, float or string). - * - * @return the metadata converted to a map - */ - public Map<String, Object> toMap() { - Map<String, Object> metadataMap = new HashMap<String, Object>(); - metadataMap.put("badEventCount", badEventCount); - metadataMap.put("size", byteCount); - metadataMap.put("endDate", endDate.getTime()); - metadataMap.put("endEvent", endEvent); - metadataMap.put("endDate", endDate.getTime()); - metadataMap.put("endEvent", endEvent); - metadataMap.put("eventCount", eventCount); - metadataMap.put("evioFile", evioFile.getPath()); - metadataMap.put("hasEnd", hasEnd ? 1 : 0); - metadataMap.put("hasPrestart", hasPrestart ? 1 : 0); - metadataMap.put("run", run); - metadataMap.put("fileNumber", sequence); - metadataMap.put("startDate", startDate.getTime()); - metadataMap.put("startEvent", startEvent); - return metadataMap; - } } Added: java/trunk/record-util/src/main/java/org/hps/record/evio/EvioFileMetadataAdapter.java ============================================================================= --- java/trunk/record-util/src/main/java/org/hps/record/evio/EvioFileMetadataAdapter.java (added) +++ java/trunk/record-util/src/main/java/org/hps/record/evio/EvioFileMetadataAdapter.java Tue Sep 1 19:04:51 2015 @@ -0,0 +1,104 @@ +package org.hps.record.evio; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.freehep.record.loop.AbstractLoopListener; +import org.freehep.record.loop.LoopEvent; +import org.freehep.record.loop.LoopListener; +import org.freehep.record.loop.RecordEvent; +import org.freehep.record.loop.RecordListener; +import org.jlab.coda.jevio.EvioEvent; +import org.lcsim.util.log.DefaultLogFormatter; +import org.lcsim.util.log.LogUtil; + +/** + * Loop adapter for creating a list of metadata from processed EVIO files. + * + * @author Jeremy McCormick, SLAC + */ +public class EvioFileMetadataAdapter extends AbstractLoopListener implements RecordListener, LoopListener { + + /** + * Setup logging. + */ + private static Logger LOGGER = LogUtil.create(EvioFileMetadataProcessor.class, new DefaultLogFormatter(), Level.ALL); + + /** + * The EVIO file currently being processed. + */ + File currentEvioFile = null; + + /** + * The list of metadata created by the job. + */ + List<EvioFileMetadata> metadataList = new ArrayList<EvioFileMetadata>(); + + /** + * The processor that creates the metadata for each file. + */ + EvioFileMetadataProcessor metadataProcessor = null; + + /** + * Perform single record processing of EVIO events. + * + * @param recordEvent the record event with the EVIO event + */ + @Override + public void recordSupplied(RecordEvent recordEvent) { + EvioEvent evioEvent = (EvioEvent) recordEvent.getRecord(); + if (evioEvent == null) { + throw new RuntimeException("Record has wrong type."); + } + EvioFileSource evioSource = (EvioFileSource) recordEvent.getSource().getRecordSource(); + if (evioSource == null) { + throw new RuntimeException("Wrong record source type."); + } + + // First time setup. + if (currentEvioFile == null) { + currentEvioFile = evioSource.getCurrentFile(); + LOGGER.info("got first file " + currentEvioFile.getPath()); + metadataProcessor = new EvioFileMetadataProcessor(currentEvioFile); + } + + // Start of new file. + if (evioSource.getCurrentFile() != currentEvioFile) { + LOGGER.info("putting metadata for " + currentEvioFile.getPath() + " into output"); + EvioFileMetadata metadata = this.metadataProcessor.createEvioFileMetadata(); + metadataList.add(metadata); + + LOGGER.info("starting metadata for new file " + evioSource.getCurrentFile().getPath()); + currentEvioFile = evioSource.getCurrentFile(); + metadataProcessor = new EvioFileMetadataProcessor(currentEvioFile); + } + + // Process an EVIO event. + metadataProcessor.process(evioEvent); + } + + /** + * End of job hook. + * + * @param event the loop event (contents ignored) + */ + public void finish(final LoopEvent event) { + LOGGER.info("finish"); + metadataList.add(metadataProcessor.createEvioFileMetadata()); + LOGGER.info("created " + metadataList.size() + " metadata objects in job"); + LOGGER.getHandlers()[0].flush(); + } + + /** + * Get the list of metadata generated by the job. + * + * @return the list of metadata generated by the job + */ + public List<EvioFileMetadata> getEvioFileMetadata() { + return this.metadataList; + } + +} Added: java/trunk/record-util/src/main/java/org/hps/record/evio/EvioFileMetadataProcessor.java ============================================================================= --- java/trunk/record-util/src/main/java/org/hps/record/evio/EvioFileMetadataProcessor.java (added) +++ java/trunk/record-util/src/main/java/org/hps/record/evio/EvioFileMetadataProcessor.java Tue Sep 1 19:04:51 2015 @@ -0,0 +1,113 @@ +package org.hps.record.evio; + +import java.io.File; +import java.util.Date; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.jlab.coda.jevio.EvioEvent; +import org.lcsim.util.log.DefaultLogFormatter; +import org.lcsim.util.log.LogUtil; + + +public class EvioFileMetadataProcessor extends EvioEventProcessor { + + private static Logger LOGGER = LogUtil.create(EvioFileMetadataProcessor.class, new DefaultLogFormatter(), Level.ALL); + + private File evioFile = null; + private Date startDate = null; + private Date endDate = null; + private int badEventCount = 0; + private int eventCount = 0; + private int byteCount = 0; + private boolean hasPrestart = false; + private boolean hasEnd = false; + private int[] eventIdData = null; + private Integer run = null; + private Integer endEvent = null; + private Integer startEvent = null; + private Long lastTimestamp = null; + + EvioFileMetadataProcessor(File evioFile) { + this.evioFile = evioFile; + } + + public void process(EvioEvent evioEvent) { + byteCount += evioEvent.getTotalBytes(); + if (EventTagConstant.PRESTART.equals(evioEvent)) { + LOGGER.info("found PRESTART"); + hasPrestart = true; + final int[] controlEventData = EvioEventUtilities.getControlEventData(evioEvent); + final long timestamp = controlEventData[0] * 1000L; + startDate = new Date(timestamp); + LOGGER.info("set start date to " + startDate + " from PRESTART"); + if (run == null) { + run = controlEventData[1]; + LOGGER.info("set run to " + run); + } + } else if (EventTagConstant.END.equals(evioEvent)) { + LOGGER.info("found END event"); + hasEnd = true; + final int[] controlEventData = EvioEventUtilities.getControlEventData(evioEvent); + final long timestamp = controlEventData[0] * 1000L; + endDate = new Date(timestamp); + LOGGER.info("set end date to " + endDate); + if (run == null) { + run = controlEventData[1]; + LOGGER.info("set run to " + run); + } + } else if (EvioEventUtilities.isPhysicsEvent(evioEvent)) { + final int[] headBankData = EvioEventUtilities.getHeadBankData(evioEvent); + if (startDate == null) { + if (headBankData[3] != 0) { + startDate = new Date(headBankData[3] * 1000L); + LOGGER.info("set start date to " + startDate + " from physics event"); + } + } + if (run == null) { + run = headBankData[1]; + LOGGER.info("set run to " + run + " from physics event"); + } + eventIdData = EvioEventUtilities.getEventIdData(evioEvent); + if (startEvent == null) { + startEvent = eventIdData[0]; + LOGGER.info("set start event " + startEvent); + } + if (headBankData[3] != 0) { + lastTimestamp = headBankData[3] * 1000L; + } + ++eventCount; + } + } + + EvioFileMetadata createEvioFileMetadata() { + + EvioFileMetadata metadata = new EvioFileMetadata(evioFile); + + // Set end date from last valid timestamp. + if (endDate == null) { + endDate = new Date(lastTimestamp); + LOGGER.info("set end date to " + endDate + " from last timestamp " + lastTimestamp); + } + + // Set end event number. + if (eventIdData != null) { + endEvent = eventIdData[0]; + LOGGER.info("set end event " + endEvent); + } + + // Set values on the metadata object. + metadata.setStartDate(startDate); + metadata.setEndDate(endDate); + metadata.setBadEventCount(badEventCount); + metadata.setByteCount(byteCount); + metadata.setEventCount(eventCount); + metadata.setHasPrestart(hasPrestart); + metadata.setHasEnd(hasEnd); + metadata.setRun(run); + metadata.setEndEvent(endEvent); + metadata.setStartEvent(startEvent); + + return metadata; + } +}