@@ -239,14 +239,12 @@ static void thread_disp_finalizer(SEXP xptr) {
239
239
if (NANO_PTR (xptr ) == NULL ) return ;
240
240
nano_thread_disp * xp = (nano_thread_disp * ) NANO_PTR (xptr );
241
241
nano_cv * ncv = xp -> cv ;
242
- if (ncv != NULL ) {
243
- nng_mtx * mtx = ncv -> mtx ;
244
- nng_cv * cv = ncv -> cv ;
245
- nng_mtx_lock (mtx );
246
- ncv -> condition = -1 ;
247
- nng_cv_wake (cv );
248
- nng_mtx_unlock (mtx );
249
- }
242
+ nng_mtx * mtx = ncv -> mtx ;
243
+ nng_cv * cv = ncv -> cv ;
244
+ nng_mtx_lock (mtx );
245
+ ncv -> condition = -1 ;
246
+ nng_cv_wake (cv );
247
+ nng_mtx_unlock (mtx );
250
248
if (xp -> tls != NULL ) {
251
249
nng_tls_config_free (xp -> tls );
252
250
}
@@ -266,6 +264,9 @@ static void thread_disp_finalizer(SEXP xptr) {
266
264
R_Free (xp -> haio );
267
265
R_Free (xp -> url );
268
266
R_Free (xp -> online );
267
+ nng_cv_free (ncv -> cv );
268
+ nng_mtx_free (ncv -> mtx );
269
+ R_Free (xp -> cv );
269
270
R_Free (xp );
270
271
271
272
}
@@ -547,7 +548,7 @@ static void rnng_dispatch_thread(void *args) {
547
548
if (nng_rep0_open (& hsock ))
548
549
goto exitlevel1 ;
549
550
550
- if (nng_dial (hsock , disp -> host , & hdial , NNG_FLAG_NONBLOCK ))
551
+ if (nng_dial (hsock , disp -> host , & hdial , 0 ))
551
552
goto exitlevel2 ;
552
553
553
554
for (R_xlen_t i = 0 ; i < n ; i ++ ) {
@@ -675,12 +676,17 @@ SEXP rnng_dispatcher_socket(SEXP host, SEXP url, SEXP tls) {
675
676
Rf_error ("'tls' is not a valid TLS Configuration" );
676
677
677
678
int xc ;
678
- SEXP cv , cvt , xptr , sock , list ;
679
+ SEXP xptr , sock , list ;
680
+
681
+ nano_cv * ncv = R_Calloc (1 , nano_cv );
682
+ if ((xc = nng_mtx_alloc (& ncv -> mtx )))
683
+ goto exitlevel1 ;
684
+
685
+ if ((xc = nng_cv_alloc (& ncv -> cv , ncv -> mtx )))
686
+ goto exitlevel2 ;
679
687
680
- PROTECT (cvt = rnng_cv_alloc ());
681
- nano_cv * tcv = (nano_cv * ) NANO_PTR (cvt );
682
688
nano_thread_disp * disp = R_Calloc (1 , nano_thread_disp );
683
- disp -> cv = tcv ;
689
+ disp -> cv = ncv ;
684
690
disp -> n = nd ;
685
691
disp -> tls = sec ? (nng_tls_config * ) NANO_PTR (tls ) : NULL ;
686
692
if (sec ) nng_tls_config_hold (disp -> tls );
@@ -703,22 +709,18 @@ SEXP rnng_dispatcher_socket(SEXP host, SEXP url, SEXP tls) {
703
709
nano_listener * hl = R_Calloc (1 , nano_listener );
704
710
705
711
if (nng_url_parse (& disp -> up , disp -> url [0 ]))
706
- goto exitlevel1 ;
712
+ goto exitlevel3 ;
707
713
708
714
if ((xc = nng_req0_open (hsock )))
709
- goto exitlevel2 ;
715
+ goto exitlevel4 ;
710
716
711
- PROTECT (cv = rnng_cv_alloc ());
712
- nano_cv * ncv = (nano_cv * ) NANO_PTR (cv );
713
717
if ((xc = nng_socket_set_ms (* hsock , "req:resend-time" , 0 )) ||
714
- (xc = nng_pipe_notify (* hsock , NNG_PIPE_EV_ADD_POST , pipe_cb_signal , ncv )) ||
715
718
(xc = nng_listen (* hsock , disp -> host , & hl -> list , 0 )) ||
716
719
(xc = nng_thread_create (& disp -> thr , rnng_dispatch_thread , disp )))
717
- goto exitlevel3 ;
720
+ goto exitlevel5 ;
718
721
719
722
PROTECT (sock = R_MakeExternalPtr (hsock , nano_SocketSymbol , R_NilValue ));
720
723
R_RegisterCFinalizerEx (sock , socket_finalizer , TRUE);
721
- Rf_setAttrib (sock , nano_CvSymbol , cvt );
722
724
723
725
xptr = R_MakeExternalPtr (disp , nano_SocketSymbol , R_NilValue );
724
726
Rf_setAttrib (sock , R_MissingArg , xptr );
@@ -728,21 +730,14 @@ SEXP rnng_dispatcher_socket(SEXP host, SEXP url, SEXP tls) {
728
730
Rf_setAttrib (sock , nano_ListenerSymbol , list );
729
731
R_RegisterCFinalizerEx (list , listener_finalizer , TRUE);
730
732
731
- rnng_cv_wait (cv );
732
- if ((xc = nng_pipe_notify (* hsock , NNG_PIPE_EV_ADD_POST , NULL , NULL )))
733
- goto exitlevel4 ;
734
-
735
- UNPROTECT (3 );
733
+ UNPROTECT (1 );
736
734
return sock ;
737
735
738
- exitlevel4 :
739
- UNPROTECT (1 );
740
- exitlevel3 :
736
+ exitlevel5 :
741
737
nng_close (* hsock );
742
- UNPROTECT (1 );
743
- exitlevel2 :
738
+ exitlevel4 :
744
739
nng_url_free (disp -> up );
745
- exitlevel1 :
740
+ exitlevel3 :
746
741
R_Free (hl );
747
742
R_Free (hsock );
748
743
for (R_xlen_t i = 0 ; i < nd ; i ++ ) {
@@ -758,7 +753,11 @@ SEXP rnng_dispatcher_socket(SEXP host, SEXP url, SEXP tls) {
758
753
R_Free (disp -> online );
759
754
if (sec ) nng_tls_config_free (disp -> tls );
760
755
R_Free (disp );
761
- UNPROTECT (1 );
756
+ nng_cv_free (ncv -> cv );
757
+ exitlevel2 :
758
+ nng_mtx_free (ncv -> mtx );
759
+ R_Free (ncv );
760
+ exitlevel1 :
762
761
ERROR_OUT (xc );
763
762
764
763
}
0 commit comments