001 /** 002 * 003 */ 004 package org.wdssii.core; 005 006 import java.io.File; 007 import java.io.FilenameFilter; 008 009 import org.apache.commons.logging.Log; 010 import org.apache.commons.logging.LogFactory; 011 import org.wdssii.core.fam.FamIndexHelper; 012 import org.wdssii.core.fam.FamIndexHelperFactory; 013 014 /** 015 * Abstract class of algorithms 016 * 017 * @author lakshman 018 * 019 */ 020 public abstract class Ingestor extends WDSSIIProgram { 021 private static Log log = LogFactory.getLog(Ingestor.class); 022 023 private String inputDir = "/tmp/input"; 024 private String[] filenamePatterns = new String[]{ ".nc", ".netcdf" }; 025 private boolean exitAfterInitialRead = false; 026 027 private class FilenamePatternMatcher implements FilenameFilter { 028 @Override 029 public boolean accept(File dir, String name) { 030 for (String pattern : filenamePatterns){ 031 if (name.contains(pattern)){ 032 return true; 033 } 034 } 035 return false; 036 } 037 } 038 039 private FamIndexHelper helper = FamIndexHelperFactory.newHelper(new FilenamePatternMatcher()); 040 041 public String getInputDir() { 042 return inputDir; 043 } 044 045 public void setInputDir(String inputDir) { 046 this.inputDir = inputDir; 047 } 048 049 public String getFilenamePatterns() { 050 StringBuilder sb = new StringBuilder(); 051 for (int i=0; i < filenamePatterns.length; ++i){ 052 sb.append(filenamePatterns[i]); 053 if (i != (filenamePatterns.length-1)){ 054 sb.append(" "); 055 } 056 } 057 return sb.toString(); 058 } 059 060 public void setFilenamePatterns(String pattern) { 061 this.filenamePatterns = StringUtil.split(pattern).toArray(new String[0]); 062 } 063 064 065 public boolean isExitAfterInitialRead() { 066 return exitAfterInitialRead; 067 } 068 069 public void setExitAfterInitialRead(boolean exitAfterInitialRead) { 070 this.exitAfterInitialRead = exitAfterInitialRead; 071 } 072 073 /** 074 * Monitor input directory for files, then process them. 075 */ 076 @Override 077 public void execute() { 078 File[] files = helper.getInitialFiles(inputDir); 079 if (!isRealTime()) { 080 // process the older data 081 processFiles(files); 082 if (exitAfterInitialRead){ 083 return; 084 } 085 } 086 while (true){ 087 files = helper.getNewFiles(); 088 processFiles(files); 089 } 090 } 091 092 private void processFiles(File[] files){ 093 for (File f : files){ 094 try{ 095 doIngest(f); 096 } catch (Throwable e){ 097 log.warn("Failed to process " + f, e); 098 } 099 } 100 } 101 102 /** Read the file, do the conversion, write the output and notify the index. */ 103 protected abstract void doIngest(File f); 104 105 /** initializes wdssii */ 106 protected Ingestor() { 107 super(); 108 } 109 }