001    /**
002     * 
003     */
004    package org.wdssii.core;
005    
006    import java.io.File;
007    import java.io.FilenameFilter;
008    
009    import org.apache.commons.logging.Log;
010    import org.apache.commons.logging.LogFactory;
011    import org.wdssii.core.fam.FamIndexHelper;
012    import org.wdssii.core.fam.FamIndexHelperFactory;
013    
014    /**
015     * Abstract class of algorithms
016     * 
017     * @author lakshman
018     * 
019     */
020    public abstract class Ingestor extends WDSSIIProgram {
021            private static Log log = LogFactory.getLog(Ingestor.class);
022    
023            private String inputDir = "/tmp/input";
024            private String[] filenamePatterns = new String[]{ ".nc", ".netcdf" };
025            private boolean exitAfterInitialRead = false;
026            
027            private class FilenamePatternMatcher implements FilenameFilter {
028                    @Override
029                    public boolean accept(File dir, String name) {
030                            for (String pattern : filenamePatterns){
031                                    if (name.contains(pattern)){
032                                            return true;
033                                    }
034                            }
035                            return false;
036                    }
037            }
038            
039            private FamIndexHelper helper = FamIndexHelperFactory.newHelper(new FilenamePatternMatcher());
040            
041            public String getInputDir() {
042                    return inputDir;
043            }
044    
045            public void setInputDir(String inputDir) {
046                    this.inputDir = inputDir;
047            }
048    
049            public String getFilenamePatterns() {
050                    StringBuilder sb = new StringBuilder();
051                    for (int i=0; i < filenamePatterns.length; ++i){
052                            sb.append(filenamePatterns[i]);
053                            if (i != (filenamePatterns.length-1)){
054                                    sb.append(" ");
055                            }
056                    }
057                    return sb.toString();
058            }
059    
060            public void setFilenamePatterns(String pattern) {
061                    this.filenamePatterns = StringUtil.split(pattern).toArray(new String[0]);
062            }
063    
064            
065            public boolean isExitAfterInitialRead() {
066                    return exitAfterInitialRead;
067            }
068    
069            public void setExitAfterInitialRead(boolean exitAfterInitialRead) {
070                    this.exitAfterInitialRead = exitAfterInitialRead;
071            }
072    
073            /**
074             * Monitor input directory for files, then process them.
075             */
076            @Override
077            public void execute() {
078                    File[] files = helper.getInitialFiles(inputDir);
079                    if (!isRealTime()) {
080                            // process the older data
081                            processFiles(files);
082                            if (exitAfterInitialRead){
083                                    return;
084                            }
085                    }
086                    while (true){
087                            files = helper.getNewFiles();
088                            processFiles(files);
089                    }
090            }
091            
092            private void processFiles(File[] files){
093                    for (File f : files){
094                            try{
095                                    doIngest(f);
096                            } catch (Throwable e){
097                                    log.warn("Failed to process " + f, e);
098                            }
099                    }
100            }
101    
102            /** Read the file, do the conversion, write the output and notify the index. */
103            protected abstract void doIngest(File f);
104    
105            /** initializes wdssii */
106            protected Ingestor() {
107                    super();
108            }
109    }