Sophie

Sophie

distrib > Mageia > 4 > x86_64 > by-pkgid > f7c0b847fd93ae89b7a85fb3abb004f2 > files > 118

ocaml-lwt-devel-2.4.3-3.mga4.x86_64.rpm

(* Lightweight thread library for Objective Caml
 * http://www.ocsigen.org/lwt
 * Module Test_lwt_stream
 * Copyright (C) 2009 Jérémie Dimino
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU Lesser General Public License as
 * published by the Free Software Foundation, with linking exceptions;
 * either version 2.1 of the License, or (at your option) any later
 * version. See COPYING file for details.
 *
 * This program is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
 * 02111-1307, USA.
 *)

open Lwt
open Test

let suite = suite "lwt_stream" [
  test "from"
    (fun () ->
       let mvar = Lwt_mvar.create_empty () in
       let stream = Lwt_stream.from (fun () ->
                                       lwt x = Lwt_mvar.take mvar in
                                       return (Some x)) in
       let t1 = Lwt_stream.next stream in
       let t2 = Lwt_stream.next stream in
       let t3 = Lwt_stream.next stream in
       lwt () = Lwt_mvar.put mvar 1 in
       lwt x1 = t1 and x2 = t2 and x3 = t3 in
       return ([x1; x2; x3] = [1; 1; 1]));

  test "of_list"
    (fun () ->
       let stream = Lwt_stream.of_list [1; 2; 3] in
       lwt x1 = Lwt_stream.next stream in
       lwt x2 = Lwt_stream.next stream in
       lwt x3 = Lwt_stream.next stream in
       return ([x1; x2; x3] = [1; 2; 3]));

  test "clone"
    (fun () ->
       let stream1 = Lwt_stream.of_list [1; 2; 3] in
       let stream2 = Lwt_stream.clone stream1 in
       lwt x1_1 = Lwt_stream.next stream1 in
       lwt x2_1 = Lwt_stream.next stream2 in
       lwt x1_2 = Lwt_stream.next stream1
       and x1_3 = Lwt_stream.next stream1
       and x2_2 = Lwt_stream.next stream2
       and x2_3 = Lwt_stream.next stream2 in
       return ([x1_1; x1_2; x1_3] = [1; 2; 3] && [x2_1; x2_2; x2_3] = [1; 2; 3]));

  test "clone 2"
    (fun () ->
       let stream1, push = Lwt_stream.create () in
       push (Some 1);
       let stream2 = Lwt_stream.clone stream1 in
       let x1_1 = poll (Lwt_stream.next stream1) in
       let x1_2 = poll (Lwt_stream.next stream1) in
       let x2_1 = poll (Lwt_stream.next stream2) in
       let x2_2 = poll (Lwt_stream.next stream2) in
       return ([x1_1;x1_2;x2_1;x2_2] = [Some 1;None;Some 1;None]));

  test "create"
    (fun () ->
       let stream, push = Lwt_stream.create () in
       push (Some 1);
       push (Some 2);
       push (Some 3);
       push None;
       lwt l = Lwt_stream.to_list stream in
       return (l = [1; 2; 3]));

  test "create 2"
    (fun () ->
       let stream, push = Lwt_stream.create () in
       push None;
       let t = Lwt_stream.next stream in
       return (Lwt.state t = Fail Lwt_stream.Empty));

  test "create_bounded"
    (fun () ->
       let stream, push = Lwt_stream.create_bounded 3 in
       let acc = true in
       let acc = acc && state (push#push 1) = Return () in
       let acc = acc && state (push#push 2) = Return () in
       let acc = acc && state (push#push 3) = Return () in
       let t = push#push 4 in
       let acc = acc && state t = Sleep in
       let acc = acc && state (push#push 5) = Fail Lwt_stream.Full in
       let acc = acc && state (push#push 6) = Fail Lwt_stream.Full in
       let acc = acc && state (Lwt_stream.get stream) = Return (Some 1) in
       (* Lwt_stream uses wakeup_later so we have to wait a bit. *)
       lwt () = Lwt_unix.yield () in
       let acc = acc && state t = Return () in
       let acc = acc && state (Lwt_stream.get stream) = Return (Some 2) in
       let acc = acc && state (push#push 7) = Return () in
       push#close;
       let acc = acc && state (push#push 8) = Fail Lwt_stream.Closed in
       let acc = acc && state (Lwt_stream.to_list stream) = Return [3; 4; 7] in
       return acc);

  test "get_while"
    (fun () ->
       let stream = Lwt_stream.of_list [1; 2; 3; 4; 5] in
       lwt l1 = Lwt_stream.get_while (fun x -> x < 3) stream in
       lwt l2 = Lwt_stream.to_list stream in
       return (l1 = [1; 2] && l2 = [3; 4; 5]));

  test "peek"
    (fun () ->
       let stream = Lwt_stream.of_list [1; 2; 3; 4; 5] in
       lwt x = Lwt_stream.peek stream in
       lwt y = Lwt_stream.peek stream in
       lwt l = Lwt_stream.to_list stream in
       return (x = Some 1 && y = Some 1 && l = [1; 2; 3; 4; 5]));

  test "npeek"
    (fun () ->
       let stream = Lwt_stream.of_list [1; 2; 3; 4; 5] in
       lwt x = Lwt_stream.npeek 3 stream in
       lwt y = Lwt_stream.npeek 1 stream in
       lwt l = Lwt_stream.to_list stream in
       return (x = [1; 2; 3] && y = [1] && l = [1; 2; 3; 4; 5]));

  test "get_available"
    (fun () ->
       let stream, push = Lwt_stream.create () in
       push (Some 1);
       push (Some 2);
       push (Some 3);
       let l = Lwt_stream.get_available stream in
       push (Some 4);
       lwt x = Lwt_stream.get stream in
       return (l = [1; 2; 3] && x = Some 4));

  test "get_available_up_to"
    (fun () ->
       let stream, push = Lwt_stream.create () in
       push (Some 1);
       push (Some 2);
       push (Some 3);
       push (Some 4);
       let l = Lwt_stream.get_available_up_to 2 stream in
       lwt x = Lwt_stream.get stream in
       return (l = [1; 2] && x = Some 3));

  test "filter"
    (fun () ->
       let stream, push = Lwt_stream.create () in
       push (Some 1);
       push (Some 2);
       push (Some 3);
       push (Some 4);
       let filtered = Lwt_stream.filter ((=) 3) stream in
       lwt x = Lwt_stream.get filtered in
       let l = Lwt_stream.get_available filtered in
       return (x = Some 3 && l = []));

  test "filter_map"
    (fun () ->
       let stream, push = Lwt_stream.create () in
       push (Some 1);
       push (Some 2);
       push (Some 3);
       push (Some 4);
       let filtered = Lwt_stream.filter_map (function 3 ->  Some "3" | _ -> None ) stream in
       lwt x = Lwt_stream.get filtered in
       let l = Lwt_stream.get_available filtered in
       return (x = Some "3" && l = []));

  test "last_new"
    (fun () ->
       let stream, push = Lwt_stream.create () in
       push (Some 1);
       push (Some 2);
       push (Some 3);
       lwt x = Lwt_stream.last_new stream in
       return (x = 3));

  test "cancel push stream 1"
    (fun () ->
       let stream, push = Lwt_stream.create () in
       let t = Lwt_stream.next stream in
       cancel t;
       return (state t = Fail Canceled));

  test "cancel push stream 2"
    (fun () ->
       let stream, push = Lwt_stream.create () in
       let t = Lwt_stream.next stream in
       cancel t;
       push (Some 1);
       let t' = Lwt_stream.next stream in
       return (state t' = Return 1));

  test "cancel push stream 3"
    (fun () ->
       let stream, push = Lwt_stream.create () in
       let t1 = Lwt_stream.next stream in
       let t2 = Lwt_stream.next stream in
       cancel t1;
       push (Some 1);
       return (state t1 = Fail Canceled && state t2 = Return 1));

  (* check if the push function keeps references to the elements in
     the stream *)
  test "push and GC"
    (fun () ->
       let w = Weak.create 5 in
       (* Count the number of reachable elements in the stream. *)
       let count () =
         let rec loop acc idx =
           if idx = Weak.length w then
             acc
           else
             match Weak.get w idx with
               | None -> loop acc (idx + 1)
               | Some v -> loop (acc + 1) (idx + 1)
         in
         loop 0 0
       in
       (* Run some test and return the push function of the stream. *)
       let test () =
         let stream, push = Lwt_stream.create () in
         assert (count () = 0);
         let r1 = Some(ref 1) in
         push r1;
         Weak.set w 1 r1;
         let r2 = Some(ref 2) in
         push r2;
         Weak.set w 2 r2;
         let r3 = Some(ref 3) in
         push r3;
         Weak.set w 3 r3;
         assert (count () = 3);
         assert (state (Lwt_stream.next stream) = Return {contents = 1});
         Gc.full_major ();
         (* Ocaml can consider that stream is unreachable before the
            next line, hence freeing the whole data. *)
         assert (count () <= 3);
         push
       in
       let push = test () in
       Gc.full_major ();
       (* At this point [stream] is unreachable. *)
       assert (count () = 0);
       (* We have that to force caml to keep a reference on [push]. *)
       push (Some(ref 4));
       return true);

  test "map_exn"
    (fun () ->
       let open Lwt_stream in
       let l = [Value 1; Error Exit; Error (Failure "plop"); Value 42; Error End_of_file] in
       let q = ref l in
       let stream =
         Lwt_stream.from
           (fun () ->
              match !q with
                | [] ->
                    return None
                | Value x :: l ->
                    q := l;
                    return (Some x)
                | Error e :: l ->
                    q := l;
                    raise_lwt e)
       in
       lwt l' = Lwt_stream.to_list (Lwt_stream.map_exn stream) in
       return (l = l'));

  test "on_terminate"
    (fun () ->
      let st = Lwt_stream.of_list [1; 2] in
      let b = ref false in
      Lwt_stream.on_terminate st (fun () -> b := true);
      ignore (Lwt_stream.peek st);
      let b1 = !b = false in
      ignore (Lwt_stream.junk st);
      ignore (Lwt_stream.peek st);
      let b2 = !b = false in
      ignore (Lwt_stream.junk st);
      ignore (Lwt_stream.peek st);
      let b3 = !b = true in
      Lwt.return (b1 && b2 && b3));
]