1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 package org.ogf.graap.wsag.server.monitoring;
36
37 import java.text.MessageFormat;
38 import java.text.ParseException;
39 import java.util.Arrays;
40 import java.util.Date;
41 import java.util.Iterator;
42 import java.util.List;
43 import java.util.Map;
44 import java.util.Observable;
45 import java.util.Observer;
46 import java.util.Vector;
47
48 import org.apache.log4j.Logger;
49 import org.apache.xmlbeans.XmlBoolean;
50 import org.apache.xmlbeans.XmlInt;
51 import org.apache.xmlbeans.XmlObject;
52 import org.apache.xmlbeans.XmlString;
53 import org.ogf.graap.wsag.api.Agreement;
54 import org.ogf.graap.wsag.api.logging.LogMessage;
55 import org.ogf.graap.wsag.server.accounting.IAccountingSystem;
56 import org.ogf.graap.wsag.server.accounting.SimpleAccountingSystemLogger;
57 import org.ogf.graap.wsag.server.api.IAgreementContext;
58 import org.ogf.graap.wsag.server.api.impl.AgreementContext;
59 import org.ogf.graap.wsag.server.persistence.impl.PersistentAgreementContainer;
60 import org.ogf.schemas.graap.wsAgreement.AgreementContextType;
61 import org.ogf.schemas.graap.wsAgreement.AgreementPropertiesType;
62 import org.ogf.schemas.graap.wsAgreement.AgreementStateType;
63 import org.ogf.schemas.graap.wsAgreement.GuaranteeTermStateType;
64 import org.ogf.schemas.graap.wsAgreement.ServiceTermStateType;
65 import org.ogf.schemas.graap.wsAgreement.TermTreeType;
66 import org.ogf.schemas.graap.wsAgreement.TerminateInputType;
67 import org.quartz.CronExpression;
68 import org.quartz.CronTrigger;
69 import org.quartz.JobDetail;
70 import org.quartz.Scheduler;
71 import org.quartz.SchedulerException;
72 import org.quartz.SchedulerFactory;
73 import org.quartz.Trigger;
74 import org.quartz.impl.StdSchedulerFactory;
75
76
77
78
79
80
81
82
83
84 public class MonitorableAgreement extends Observable
85 implements Agreement, Observer
86 {
87
88 private static final Logger LOG = Logger.getLogger( MonitorableAgreement.class );
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104 public static final String MONITORING_ACTIVE = "org.wsag4j.monitoring.isActive";
105
106
107
108
109
110
111
112
113
114
115 public static final String MONITORING_CRON = "org.wsag4j.monitoring.cron";
116
117
118
119
120
121
122
123
124
125
126
127
128
129 public static final String MONITORING_HANDLER = "org.wsag4j.monitoring.handler";
130
131
132
133
134
135
136
137
138
139
140 public static final String MONITORING_HANDLER_COUNT = "org.wsag4j.monitoring.handler.count";
141
142
143
144
145 private Agreement agreementInstance;
146
147 private IAgreementContext executionContext = new AgreementContext( this );
148
149 private final List<IServiceTermMonitoringHandler> monitoringHandler =
150 new Vector<IServiceTermMonitoringHandler>();
151
152 private Scheduler scheduler;
153
154 private String jobName;
155
156 private static final String JOB_GROUP = "WSAG4J";
157
158 private IAccountingSystem accountingSystem = new SimpleAccountingSystemLogger();
159
160 private boolean monitoring = false;
161
162
163
164
165 public boolean isMonitoring()
166 {
167 return monitoring;
168 }
169
170
171
172
173 private static final String DEFAULT_SCHEDULE = "0 0/1 * * * ?";
174
175 private String cronExpression = DEFAULT_SCHEDULE;
176
177
178
179
180
181
182
183
184
185 public MonitorableAgreement( Agreement agreement )
186 {
187 this.agreementInstance = agreement;
188
189
190
191
192 agreement.addObserver( this );
193
194
195
196
197 executionContext.getExecutionProperties().putAll( agreement.getExecutionContext() );
198 executionContext.getTransientExecutionProperties().putAll( agreement.getTransientExecutionContext() );
199
200 initializeScheduler();
201 }
202
203
204
205
206
207
208
209
210
211 public MonitorableAgreement( PersistentAgreementContainer persistentAgreementContainer )
212 {
213
214 throw new UnsupportedOperationException( "Not supported yet." );
215 }
216
217 private void initializeScheduler()
218 {
219
220
221
222 synchronized ( SchedulerFactory.class )
223 {
224
225 SchedulerFactory factory = new StdSchedulerFactory();
226
227 try
228 {
229 scheduler = factory.getScheduler();
230
231 if ( !scheduler.isStarted() )
232 {
233 scheduler.start();
234 }
235 }
236 catch ( SchedulerException e )
237 {
238 throw new IllegalStateException( "Failed to instantiate Quartz scheduler.", e );
239 }
240
241 }
242
243 }
244
245 private IMonitoringContext initializeMonitoringContext()
246 {
247
248 IMonitoringContext monitoringContext = new MonitoringContext();
249
250
251
252
253 monitoringContext.setProperties( executionContext.getExecutionProperties() );
254
255
256
257
258 monitoringContext.setTransientProperties( executionContext.getTransientExecutionProperties() );
259
260
261
262
263
264
265
266
267
268
269 monitoringContext.setMonitoringHandler( new IServiceTermMonitoringHandler[0] );
270
271 for ( int i = 0; i < monitoringHandler.size(); i++ )
272 {
273 monitoringContext.addMonitoringHandler( monitoringHandler.get( i ) );
274 }
275
276 monitoringContext.setAccountingSystem( accountingSystem );
277
278
279
280
281 monitoringContext.getTransientProperties().put(
282 IMonitoringContext.WSAG4J_AGREEMENT_EXECUTION_CONTEXT, getExecutionContext() );
283
284 return monitoringContext;
285 }
286
287 private synchronized void scheduleMonitoringJobs( IMonitoringContext monitoringContext ) throws Exception
288 {
289
290
291
292
293
294
295
296 jobName = initializeJobName();
297
298 Trigger agreementMonitoringTrigger = createCronTrigger( jobName );
299
300
301
302
303 JobDetail agreementMonitoringDetail = new JobDetail( jobName, JOB_GROUP, AgreementMonitorJob.class );
304
305 agreementMonitoringDetail.getJobDataMap().put( AgreementMonitorJob.WSAG4J_AGREEMENT_INSTANCE,
306 agreementInstance );
307 agreementMonitoringDetail.getJobDataMap().put( AgreementMonitorJob.WSAG4J_MONITORING_CONTEXT,
308 monitoringContext );
309
310 scheduler.scheduleJob( agreementMonitoringDetail, agreementMonitoringTrigger );
311
312
313
314
315
316 }
317
318 private String initializeJobName() throws SchedulerException
319 {
320 List<String> names = Arrays.asList( scheduler.getJobNames( JOB_GROUP ) );
321
322 String name =
323 MessageFormat.format( "WSAG4J_MONITORING_JOB_{0}", new Object[] { new Date().getTime() } );
324
325 while ( names.contains( name ) )
326 {
327 name = MessageFormat.format( "WSAG4J_MONITORING_JOB_{0}", new Object[] { new Date().getTime() } );
328 try
329 {
330 wait( 10 );
331 }
332 catch ( InterruptedException e )
333 {
334
335 }
336 }
337
338 return name;
339 }
340
341
342
343
344 private void saveHandlerToExecutionContext()
345 {
346 int handlerCount = 0;
347 Iterator<IServiceTermMonitoringHandler> handler = monitoringHandler.iterator();
348
349 while ( handler.hasNext() )
350 {
351
352 String handlerClass = handler.next().getClass().getName();
353
354 String key = MONITORING_HANDLER + "." + handlerCount;
355 XmlString value = XmlString.Factory.newValue( handlerClass );
356
357 getMonitoringContext().getExecutionProperties().put( key, value );
358
359 handlerCount++;
360 }
361
362 XmlInt value = XmlInt.Factory.newValue( Integer.valueOf( handlerCount ) );
363 getMonitoringContext().getExecutionProperties().put( MONITORING_HANDLER_COUNT, value );
364 }
365
366
367
368
369
370 private void initializeHandlerFromExecutionContext()
371 {
372 int handlerCount = 0;
373 monitoringHandler.clear();
374
375 XmlInt count =
376 (XmlInt) getMonitoringContext().getExecutionProperties().get( MONITORING_HANDLER_COUNT );
377 if ( count != null )
378 {
379 handlerCount = count.getIntValue();
380 }
381
382 while ( handlerCount > 0 )
383 {
384 handlerCount--;
385
386 String key = MONITORING_HANDLER + "." + handlerCount;
387 XmlString value = (XmlString) getMonitoringContext().getExecutionProperties().get( key );
388
389 LOG.debug( "initialize agreement monitoring handler" );
390
391
392
393
394 try
395 {
396 String className = value.getStringValue();
397 LOG.debug( LogMessage.getMessage( "instantiate monitoring handler ''{0}''", className ) );
398
399 Class<IServiceTermMonitoringHandler> clazz;
400 try
401 {
402
403
404
405 @SuppressWarnings( "unchecked" )
406 Class<IServiceTermMonitoringHandler> convert =
407 (Class<IServiceTermMonitoringHandler>) this.getClass().getClassLoader()
408 .loadClass( className );
409
410 clazz = convert;
411 }
412 catch ( ClassCastException e )
413 {
414 final String msgNotRequiredInterface =
415 "monitoring handler must implement the 'IServiceTermMonitoringHandler' interface.";
416 throw new Exception( msgNotRequiredInterface, e );
417 }
418
419
420
421
422 addMonitoringHandler( clazz.newInstance() );
423
424 LOG.debug( LogMessage.getMessage( "successfully instantiated monitoring handler ''{0}''",
425 className ) );
426 }
427 catch ( Exception e )
428 {
429 String msgText = "re-initializing monitorable agreement failed: {0}";
430 LogMessage message = LogMessage.getMessage( msgText, e.getMessage() );
431 LOG.error( message, e );
432 }
433 }
434
435 }
436
437
438
439
440
441 @Deprecated
442 public IAgreementContext getMonitoringContext()
443 {
444 return executionContext;
445 }
446
447
448
449
450
451 public void setExecutionContext( IAgreementContext executionContext )
452 {
453 this.executionContext = executionContext;
454 }
455
456
457
458
459
460
461 public void addMonitoringHandler( IServiceTermMonitoringHandler handler )
462 {
463 monitoringHandler.add( handler );
464 }
465
466
467
468
469
470
471 public IServiceTermMonitoringHandler[] getMonitoringHandler()
472 {
473 return monitoringHandler.toArray( new IServiceTermMonitoringHandler[monitoringHandler.size()] );
474 }
475
476
477
478
479 public String getCronExpression()
480 {
481 return cronExpression;
482 }
483
484
485
486
487
488 public void setCronExpression( String cronExpression )
489 {
490 this.cronExpression = cronExpression;
491 }
492
493 private Trigger createCronTrigger( String name ) throws Exception
494 {
495
496
497
498 CronTrigger trigger = new CronTrigger();
499
500 try
501 {
502 if ( CronExpression.isValidExpression( cronExpression ) )
503 {
504 trigger.setCronExpression( cronExpression );
505 }
506 else
507 {
508 LOG.error( LogMessage.getMessage(
509 "Invalid cron expression ({0}). Using default monitoring schedule ({1}).",
510 cronExpression, DEFAULT_SCHEDULE ) );
511 trigger.setCronExpression( DEFAULT_SCHEDULE );
512 }
513 }
514 catch ( ParseException e )
515 {
516 String msgText = "Invalid default schedule <{0}>. Monitoring not scheduled.";
517 String message = LogMessage.format( msgText, DEFAULT_SCHEDULE );
518 throw new Exception( message, e );
519 }
520
521 trigger.setGroup( JOB_GROUP );
522 trigger.setName( name );
523
524 return trigger;
525 }
526
527
528
529
530
531
532
533 public void startMonitoring() throws Exception
534 {
535
536
537
538 XmlBoolean isMonitoring = XmlBoolean.Factory.newValue( true );
539 XmlString cron = XmlString.Factory.newValue( cronExpression );
540 getMonitoringContext().getExecutionProperties().put( MONITORING_ACTIVE, isMonitoring );
541 getMonitoringContext().getExecutionProperties().put( MONITORING_CRON, cron );
542
543
544
545
546 saveHandlerToExecutionContext();
547
548
549
550
551 IMonitoringContext monitoringContext = initializeMonitoringContext();
552
553
554
555
556 try
557 {
558 scheduleMonitoringJobs( monitoringContext );
559 }
560 catch ( Exception e )
561 {
562 final String msgText = "Error scheduling monitoring jobs. Reason: {0}";
563 String message = LogMessage.format( msgText, e.getMessage() );
564 LOG.error( message, e );
565
566 throw new Exception( message, e );
567 }
568
569 this.monitoring = true;
570 }
571
572
573
574
575
576
577
578 public void stopMonitoring() throws Exception
579 {
580 try
581 {
582 XmlBoolean isMonitoring = XmlBoolean.Factory.newValue( false );
583 getMonitoringContext().getExecutionProperties().put( MONITORING_ACTIVE, isMonitoring );
584 getAgreementInstance().getExecutionContext().put( MONITORING_ACTIVE, isMonitoring );
585
586
587
588
589
590
591
592 scheduler.unscheduleJob( jobName, JOB_GROUP );
593 }
594 catch ( SchedulerException e )
595 {
596 String message = "Error stoping the agreement monitoring. Reason: " + e.getMessage();
597 LOG.error( message );
598
599 throw new Exception( message, e );
600 }
601
602 this.monitoring = false;
603 }
604
605
606
607
608 @Override
609 public void terminate( TerminateInputType reason )
610 {
611 try
612 {
613 stopMonitoring();
614 }
615 catch ( Exception ex )
616 {
617 LOG.error( "The agreement monitoring scheduler was not stoped" );
618
619 if ( LOG.isDebugEnabled() )
620 {
621 LOG.debug( ex );
622 }
623 }
624
625 try
626 {
627 agreementInstance.terminate( reason );
628 }
629 catch ( Exception ex )
630 {
631 LOG.error( "The agreement could not be terminated." );
632
633 if ( LOG.isDebugEnabled() )
634 {
635 LOG.debug( ex );
636 }
637 }
638
639 }
640
641
642
643
644
645
646
647
648
649
650
651 public void notifyReload() throws Exception
652 {
653 agreementInstance.notifyReload( executionContext.getExecutionProperties() );
654
655
656
657
658 initializeHandlerFromExecutionContext();
659
660 Map<String, XmlObject> executionProperties = getMonitoringContext().getExecutionProperties();
661 XmlString cron = (XmlString) executionProperties.get( MonitorableAgreement.MONITORING_CRON );
662
663 if ( cron != null )
664 {
665 cronExpression = cron.getStringValue();
666 }
667 else
668 {
669 cronExpression = DEFAULT_SCHEDULE;
670 }
671
672 boolean isActive = false;
673 if ( executionProperties.containsKey( MONITORING_ACTIVE ) )
674 {
675 isActive =
676 ( (XmlBoolean) executionProperties.get( MonitorableAgreement.MONITORING_ACTIVE ) ).getBooleanValue();
677 }
678
679 if ( isActive )
680 {
681 startMonitoring();
682 }
683 }
684
685
686
687
688
689 @Override
690 public String getAgreementId()
691 {
692 return agreementInstance.getAgreementId();
693 }
694
695
696
697
698
699 @Override
700 public AgreementContextType getContext()
701 {
702 return agreementInstance.getContext();
703 }
704
705
706
707
708
709 @Override
710 public GuaranteeTermStateType[] getGuaranteeTermStates()
711 {
712 return agreementInstance.getGuaranteeTermStates();
713 }
714
715
716
717
718
719 @Override
720 public String getName()
721 {
722 return agreementInstance.getName();
723 }
724
725
726
727
728
729 @Override
730 public ServiceTermStateType[] getServiceTermStates()
731 {
732 return agreementInstance.getServiceTermStates();
733 }
734
735
736
737
738
739 @Override
740 public AgreementStateType getState()
741 {
742 return agreementInstance.getState();
743 }
744
745
746
747
748
749 @Override
750 public TermTreeType getTerms()
751 {
752 return agreementInstance.getTerms();
753 }
754
755
756
757
758 public Agreement getAgreementInstance()
759 {
760 return agreementInstance;
761 }
762
763
764
765
766
767 public void setAccountingSystem( IAccountingSystem accountingSystem )
768 {
769 if ( accountingSystem != null )
770 {
771 this.accountingSystem = accountingSystem;
772 }
773 }
774
775
776
777
778 public IAccountingSystem getAccountingSystem()
779 {
780 return accountingSystem;
781 }
782
783
784
785
786
787
788
789
790
791
792 @Override
793 public void update( Observable o, Object arg )
794 {
795 setChanged();
796 notifyObservers();
797
798
799
800
801
802
803
804
805
806 }
807
808
809
810
811 @Override
812 public boolean validate()
813 {
814 return agreementInstance.validate();
815 }
816
817
818
819
820 @Override
821 public void notifyReload( Map<String, XmlObject> executionCtx )
822 {
823 agreementInstance.notifyReload( executionCtx );
824 }
825
826
827
828
829 @Override
830 public void setAgreementId( String agreementId )
831 {
832 agreementInstance.setAgreementId( agreementId );
833 }
834
835
836
837
838 @Override
839 public void setContext( AgreementContextType context )
840 {
841 agreementInstance.setContext( context );
842 }
843
844
845
846
847 @Override
848 public void setName( String name )
849 {
850 agreementInstance.setName( name );
851
852 }
853
854
855
856
857 @Override
858 public void setTerms( TermTreeType terms )
859 {
860 agreementInstance.setTerms( terms );
861 }
862
863
864
865
866 @Override
867 public void setState( AgreementStateType agreementState )
868 {
869 agreementInstance.setState( agreementState );
870 }
871
872
873
874
875 @Override
876 public void setGuaranteeTermStates( GuaranteeTermStateType[] guaranteeTermStateList )
877 {
878 agreementInstance.setGuaranteeTermStates( guaranteeTermStateList );
879 }
880
881
882
883
884 @Override
885 public void setServiceTermStates( ServiceTermStateType[] serviceTermStateList )
886 {
887 agreementInstance.setServiceTermStates( serviceTermStateList );
888 }
889
890
891
892
893 @Override
894 public AgreementPropertiesType getXMLObject()
895 {
896 return agreementInstance.getXMLObject();
897 }
898
899
900
901
902 @Override
903 public void setXmlObject( AgreementPropertiesType properties )
904 {
905 agreementInstance.setXmlObject( properties );
906 }
907
908
909
910
911 @Override
912 public Map<String, Object> getTransientExecutionContext()
913 {
914 return agreementInstance.getTransientExecutionContext();
915 }
916
917
918
919
920 @Override
921 public Class getImplementationClass()
922 {
923 return agreementInstance.getImplementationClass();
924 }
925
926
927
928
929
930
931 @Override
932 public Map<String, XmlObject> getExecutionContext()
933 {
934 return agreementInstance.getExecutionContext();
935 }
936 }