001 /** 002 * 003 */ 004 package org.wdssii.polarmerger; 005 006 import java.util.ArrayList; 007 import java.util.Date; 008 import java.util.List; 009 010 import org.apache.commons.logging.Log; 011 import org.apache.commons.logging.LogFactory; 012 import org.wdssii.core.Algorithm; 013 import org.wdssii.core.BuilderFactory; 014 import org.wdssii.core.DataEncoder; 015 import org.wdssii.core.DataType; 016 import org.wdssii.core.IndexRecord; 017 import org.wdssii.core.Location; 018 import org.wdssii.core.PolarGrid; 019 import org.wdssii.core.Radial; 020 import org.wdssii.core.RadialSet; 021 022 /** 023 * @author lakshman 024 * 025 */ 026 public class PolarMerger extends Algorithm { 027 private static Log log = LogFactory.getLog(PolarMerger.class); 028 029 public static void main(String[] args) { 030 final PolarMerger alg = new PolarMerger(); 031 alg.setupAndExecute(args); 032 } 033 034 private String inputType = "Reflectivity"; 035 036 private boolean compositeEnabled = true; 037 038 private int writeIntervalSeconds = 60; 039 040 private boolean writeAfterEveryInputReceived = false; 041 042 private float maxElevation = 20.0f; 043 044 private float azimuthalSpacingDegrees = 0.5f; 045 046 private float gateWidthKms = 0.5f; 047 048 //private int numAz = Math.round(360.0f / azimuthalSpacingDegrees); 049 050 private int numGates = 500; 051 052 private float ageOffInMinutes = 10; 053 054 private int numElev = Math.round(maxElevation / azimuthalSpacingDegrees); 055 056 public enum MergerStrategy { DistanceWeighted, NearestNeighbor, Latest }; 057 058 private MergerStrategy mergerStrategy = MergerStrategy.DistanceWeighted; 059 060 public float getAzimuthalSpacingDegrees() { 061 return azimuthalSpacingDegrees; 062 } 063 064 public void setAzimuthalSpacingDegrees(float azimuthalSpacingDegrees) { 065 this.azimuthalSpacingDegrees = azimuthalSpacingDegrees; 066 reset(); 067 } 068 069 public boolean isCompositeEnabled() { 070 return compositeEnabled; 071 } 072 073 public void setCompositeEnabled(boolean compositeEnabled) { 074 this.compositeEnabled = compositeEnabled; 075 } 076 077 public float getMaxElevation() { 078 return maxElevation; 079 } 080 081 public void setMaxElevation(float elevationDegrees) { 082 this.maxElevation = elevationDegrees; 083 reset(); 084 } 085 086 private void reset() { 087 this.observations = null; 088 this.lookup = null; 089 //this.numAz = Math.round(360.0f / azimuthalSpacingDegrees); 090 this.numElev = Math.round(maxElevation / azimuthalSpacingDegrees); 091 } 092 093 public float getGateWidthKms() { 094 return gateWidthKms; 095 } 096 097 public void setGateWidthKms(float gateWidthKms) { 098 this.gateWidthKms = gateWidthKms; 099 } 100 101 public String getInputType() { 102 return inputType; 103 } 104 105 public void setInputType(String inputType) { 106 this.inputType = inputType; 107 } 108 109 public int getNumGates() { 110 return numGates; 111 } 112 113 public void setNumGates(int numGates) { 114 this.numGates = numGates; 115 } 116 117 public int getWriteIntervalSeconds() { 118 return writeIntervalSeconds; 119 } 120 121 public void setWriteIntervalSeconds(int writeIntervalSeconds) { 122 this.writeIntervalSeconds = writeIntervalSeconds; 123 } 124 125 public float getAgeOffInMinutes() { 126 return ageOffInMinutes; 127 } 128 129 public void setAgeOffInMinutes(float ageOffInMinutes) { 130 this.ageOffInMinutes = ageOffInMinutes; 131 } 132 133 public String getOutputType() { 134 return outputType; 135 } 136 137 public void setOutputType(String outputType) { 138 this.outputType = outputType; 139 } 140 141 /** 142 * One of the MergerStrategy enums 143 * @return mergerStrategy 144 */ 145 public String getMergerStrategy() { 146 return mergerStrategy.toString(); 147 } 148 149 /** 150 * One of the MergerStrategy enums 151 * @param mergerStrategy 152 */ 153 public void setMergerStrategy(String mergerStrategy) { 154 this.mergerStrategy = MergerStrategy.valueOf(mergerStrategy); 155 } 156 157 public void handleRecord(IndexRecord rec) { 158 try { 159 if (log.isDebugEnabled()) { 160 log.debug("Received: " + rec); 161 } 162 if (!rec.getDataType().equals(inputType)) { 163 return; 164 } 165 long start = 0; 166 if (log.isInfoEnabled()) { 167 start = System.currentTimeMillis(); 168 log.info("Starting to process " + rec); 169 } 170 RadialSet rs = (RadialSet) BuilderFactory.createDataType(rec); 171 update(rs); 172 if (log.isInfoEnabled()) { 173 long howlong = System.currentTimeMillis() - start; 174 log.info("Took " + howlong / 1000.0 + "s to process " + rec); 175 } 176 computeAndWriteIfNeeded(); 177 } catch (Exception e) { 178 log.error("Error handling " + rec, e); 179 } 180 } 181 182 private void computeAndWriteIfNeeded() { 183 if (lastWrite == null || lastUpdate == null){ 184 return; 185 } 186 long timeSinceLastWrite = lastUpdate.getTime() - lastWrite.getTime(); 187 if (writeAfterEveryInputReceived || timeSinceLastWrite > getWriteIntervalSeconds() * 1000) { 188 if (log.isInfoEnabled()) { 189 log.info(timeSinceLastWrite/1000.0 190 + " seconds since lastWrite: computing and writing outputs"); 191 } 192 computeAndWriteOutputs(); 193 } 194 } 195 196 private int numNewRadials = 0; 197 198 private MergeableObservations observations = null; 199 200 private PowerDensityLookup lookup = null; 201 202 private Date lastUpdate = null; 203 204 private Date lastWrite = null; 205 206 private Location radarLocation = null; 207 208 private float nyquist = 0; 209 210 private String outputType = ""; 211 212 private synchronized void update(RadialSet rs) { 213 initIfNeeded(rs); 214 if (lastUpdate == null || rs.getTime().after(lastUpdate)) { 215 lastUpdate = rs.getTime(); 216 if (lastWrite == null) { 217 // first time around 218 lastWrite = lastUpdate; 219 } 220 } 221 Radial[] radials = rs.getRadials(); 222 for (int i = 0; i < radials.length; ++i) { 223 update(radials[i], rs.getElevation(), rs.getRangeToFirstGateKms(), rs.getTime()); 224 } 225 numNewRadials += radials.length; 226 if (log.isDebugEnabled()) { 227 log.debug("Finished updating with " + radials.length); 228 } 229 } 230 231 private synchronized void initIfNeeded(RadialSet rs) { 232 float beamwidth = rs.getBeamWidth(); 233 float gatewidth = rs.getGateWidthKms(); 234 this.radarLocation = rs.getRadarLocation(); 235 this.nyquist = rs.getNyquist(); 236 237 // init grid if needed 238 if (observations == null) { 239 lookup = new PowerDensityLookup(beamwidth, azimuthalSpacingDegrees, 240 gatewidth, gateWidthKms); 241 observations = new MergeableObservations(mergerStrategy, lookup); 242 } 243 244 if (outputType == null || outputType.length() == 0) { 245 outputType = rs.getTypeName(); 246 } 247 } 248 249 private synchronized void update(Radial radial, float elevation, float dist_to_first_gate, Date date) { 250 int center_az = Math.round(radial.getMidAzimuth() 251 / lookup.getAzimuthalSpacingDegrees()); 252 int center_e = Math.round(elevation 253 / lookup.getAzimuthalSpacingDegrees()); 254 255 float[] radial_values = radial.getValues(); 256 257 for (int r = 0; r < radial_values.length; ++r){ 258 float value = radial_values[r]; 259 if (value != DataType.MissingData){ 260 float range = radial.getGateWidthKms()*r + dist_to_first_gate; 261 int center_rn = Math.round(range/lookup.getGateSpacingKms()); 262 Observation obs = new Observation(center_e, center_az, center_rn, value, date); 263 observations.add(obs); 264 } 265 } 266 267 } 268 269 private final float distToFirst = 0; 270 protected synchronized void computeAndWriteOutputs() { 271 if (numNewRadials > 0) { 272 if (log.isDebugEnabled()) { 273 log.debug("Writing new product with " + numNewRadials 274 + " updated/new radials"); 275 } 276 // Prune 277 Date pruneTime = new Date(lastUpdate.getTime() - (int)(ageOffInMinutes 278 * 60 * 1000)); 279 int numPruned = observations.pruneBefore(pruneTime); 280 if (log.isInfoEnabled()) { 281 log.info("Pruned " + numPruned + " observations before " 282 + pruneTime); 283 } 284 285 // Compute output polar grids 286 PolarGrid[] mergedGrids = new PolarGrid[numElev]; 287 for (int e = 0; e < numElev; ++e) { 288 float elevation = e * azimuthalSpacingDegrees; 289 // Create polar grid to hold the merged value. Initialize at zero 290 PolarGrid pg = new PolarGrid(radarLocation, lastUpdate, outputType, 291 azimuthalSpacingDegrees, gateWidthKms, elevation, 292 lookup.getBeamwidth(), nyquist, distToFirst, numGates, 0f); 293 mergedGrids[e] = pg; 294 } 295 observations.fillMergedValues(mergedGrids); 296 297 // Compute algorithms 298 PolarGrid[] algs = performAlgorithms(mergedGrids); 299 300 PolarGrid[] grids = new PolarGrid[mergedGrids.length + algs.length]; 301 for (int i=0; i < mergedGrids.length; ++i){ 302 grids[i] = mergedGrids[i]; 303 } 304 for (int i=0; i < algs.length; ++i){ 305 grids[mergedGrids.length+i] = algs[i]; 306 } 307 308 309 // Write 310 for (PolarGrid pg : grids) { 311 String subtype = getSubTypeForElevation(pg.getElevation()); 312 DataEncoder.writeDataAndNotify(pg, getOutputDir(),new String[] { subtype }); 313 } 314 315 numNewRadials = 0; 316 lastWrite = lastUpdate; 317 } else { 318 log.debug("Nothing to write"); 319 } 320 } 321 322 private PolarGrid[] performAlgorithms(PolarGrid[] grids) { 323 List<PolarGrid> result = new ArrayList<PolarGrid>(); 324 if (isCompositeEnabled()) { 325 PolarGrid example = grids[0]; 326 PolarGrid composite = new PolarGrid(example, example.getTypeName() + "Composite"); 327 float[][] comp_values = composite.getValues(); 328 for (int g = 0; g < grids.length; ++g) { 329 float[][] values = grids[g].getValues(); 330 for (int i = 0; i < values.length; ++i) { 331 for (int j = 0; j < values[0].length; ++j) { 332 comp_values[i][j] = Math.max(comp_values[i][j], 333 values[i][j]); 334 } 335 } 336 } 337 result.add(composite); 338 } 339 return result.toArray(new PolarGrid[0]); 340 } 341 342 public boolean isWriteAfterEveryInputReceived() { 343 return writeAfterEveryInputReceived; 344 } 345 346 /** If set to true, then writeIntervalSeconds is ignored. */ 347 public void setWriteAfterEveryInputReceived(boolean writeAfterEveryInputReceived) { 348 this.writeAfterEveryInputReceived = writeAfterEveryInputReceived; 349 } 350 351 }